From 4e4817f26b0849f7346bca168c580d77134e8506 Mon Sep 17 00:00:00 2001 From: Anders Fugmann Date: Thu, 1 May 2025 22:34:34 +0200 Subject: [PATCH 1/8] Fix deprecation warning on use of Async_kernel.Ivar.fill --- zmq-async/src/deferred.ml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zmq-async/src/deferred.ml b/zmq-async/src/deferred.ml index fb46f10..b9cb1de 100644 --- a/zmq-async/src/deferred.ml +++ b/zmq-async/src/deferred.ml @@ -26,7 +26,7 @@ end module Mailbox = struct type 'a t = 'a Async_kernel.Ivar.t let create () = Async_kernel.Ivar.create () - let send t v = Async_kernel.Ivar.fill t v + let send t v = Async_kernel.Ivar.fill_exn t v let recv t = Async_kernel.Ivar.read t end From 246ecb2e09b9454389714366e497bd61d690591a Mon Sep 17 00:00:00 2001 From: Anders Fugmann Date: Thu, 1 May 2025 22:35:16 +0200 Subject: [PATCH 2/8] Make resumable versions of recv_*_all to allow gracefull handling of EINTR and EAGAIN --- zmq/src/zmq.ml | 48 +++++++++++++++++++++++++++++++++++++++ zmq/src/zmq.mli | 53 +++++++++++++++++++++++++++++++++++--------- zmq/test/zmq_test.ml | 15 +++++++++---- 3 files changed, 101 insertions(+), 15 deletions(-) diff --git a/zmq/src/zmq.ml b/zmq/src/zmq.ml index 5b5f3bf..d590552 100644 --- a/zmq/src/zmq.ml +++ b/zmq/src/zmq.ml @@ -492,6 +492,42 @@ module Socket = struct type event = No_event | Poll_in | Poll_out | Poll_in_out | Poll_error external events : 'a t -> event = "caml_zmq_get_events" + type 'a resumable = unit -> 'a + + (** Allow resuming receive of a multipart message. + The function returns a function that can be "resumed" in case of EGAGIN or EINTR. + *) + let recv_all_wrapper_resumable: (?block:bool -> _ t -> 'a) -> ?block:bool -> _ t -> 'a list resumable = fun f ?block socket -> + let received = ref [] in + let rec cont f ?block () = + let message = f ?block socket in + received := message :: !received; + match has_more socket with + | true -> + cont f ~block:false () + | false -> + (* Convert the queue to a list *) + List.rev !received + in + cont f ?block + + (** Allow resuming send of a multipart message. + The function returns a function that can be "resumed" in case of EGAGIN or EINTR. + *) + let send_all_wrapper_resumable: (?block:bool -> ?more:bool -> _ t -> 'a -> unit) -> ?block:bool -> _ t -> 'a list -> unit resumable = fun f ?block socket messages -> + let messages = ref messages in + let rec cont (f: (?block:bool -> ?more:bool -> _ t -> 'a -> unit)) ?block socket () = + match !messages with + | [] -> () + | [ msg ] -> + f ?block ~more:false socket msg + | msg :: msgs -> + f ?block ~more:true socket msg; + messages := msgs; + cont f ~block:true socket () + in + cont f ?block socket + let recv_all_wrapper (f : ?block:bool -> _ t -> _) = (* Once the first message part is received all remaining message parts can be received without blocking. *) @@ -529,14 +565,26 @@ module Socket = struct let recv_all ?block socket = recv_all_wrapper recv ?block socket + let recv_all_r ?block socket = + recv_all_wrapper_resumable recv ?block socket + let send_all ?block socket message = send_all_wrapper send ?block socket message + let send_all_r ?block socket message = + send_all_wrapper_resumable send ?block socket message + let recv_msg_all ?block socket = recv_all_wrapper recv_msg ?block socket + let recv_msg_all_r ?block socket = + recv_all_wrapper_resumable recv_msg ?block socket + let send_msg_all ?block socket message = send_all_wrapper send_msg ?block socket message + + let send_msg_all_r ?block socket message = + send_all_wrapper_resumable send_msg ?block socket message end module Proxy = struct diff --git a/zmq/src/zmq.mli b/zmq/src/zmq.mli index 68e7cbc..c73ca1a 100644 --- a/zmq/src/zmq.mli +++ b/zmq/src/zmq.mli @@ -95,6 +95,10 @@ module Socket : sig val bind : 'a t -> string -> unit val unbind : 'a t -> string -> unit + (** Resumable. + Allows repeated calls until the operation has completed. Usefull to repeat calls if EINTR or EGAGIN is raised *) + type 'a resumable = unit -> 'a + (** Read a message from the socket. block indicates if the call should be blocking or non-blocking. If block is [false], [recv] will raise [Unix.Unix_error (Unix.EAGAIN, _, _)] if there are no messages available to receive on the specified socket. @@ -103,51 +107,78 @@ module Socket : sig val recv : ?block:bool -> 'a t -> string (** Read a complete multipart message from the socket. - block indicates if the call should be blocking or non-blocking. Default true + @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. + @deprecated Use [recv_all_r] to allow resuming the operation *) val recv_all : ?block:bool -> 'a t -> string list + (** Read a complete multipart message from the socket. + @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. + The function returns a [resumable] to allow resuming the operation in case of EINTR or EGAGIN. + *) + val recv_all_r : ?block:bool -> 'a t -> string list resumable + (** Send a message to the socket. - block indicates if the call should be blocking or non-blocking. Default true - more is used for multipart messages, and indicates that the more message parts will follow. Default false + @param block indicates if the call should be blocking or non-blocking. + Defaults to [true]. + @param more indicate that more messages will follow and will be joined into a multipart message. + Defaults to [true]. *) val send : ?block:bool -> ?more:bool -> 'a t -> string -> unit (** Send a multipart message to the socket. - block indicates if the call should be blocking or non-blocking. Default true + @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. + @deprecated Use [send_all_r] to allow resuming the operation *) val send_all : ?block:bool -> 'a t -> string list -> unit - (** Receive a {!Msg.t} on the socket. + (** Send a multipart message to the socket. + @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. + The function returns a [resumable] to allow resuming the operation in case of EINTR or EGAGIN. + *) + val send_all_r : ?block:bool -> 'a t -> string list -> unit resumable + + (** Receive a {!Msg.t} on the socket. @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. *) val recv_msg : ?block:bool -> 'a t -> Msg.t (** Receive a multi-part message on the socket. - - @param block indicates if the call should be blocking or non-blocking. - Defaults to [true]. + @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. + @deprecated Use [recv_msg_all_r] to allow resuming the operation *) val recv_msg_all : ?block:bool -> 'a t -> Msg.t list - (** Send a {!Msg.t} to the socket. + (** Receive a multi-part message on the socket. + @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. + The function returns a [resumable] to allow resuming the operation in case of EINTR or EGAGIN. + *) + val recv_msg_all_r : ?block:bool -> 'a t -> Msg.t list resumable +(** Send a {!Msg.t} to the socket. @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. @param more is used for multipart messages Set to [true] to indicate that - more message parts will follow. Defaults to [false]. + more message parts will follow. Defaults to [false]. *) val send_msg : ?block:bool -> ?more:bool -> 'a t -> Msg.t -> unit (** Send a multi-part message to the socket. - @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. + @deprecated Use [send_msg_all_r] to allow resuming the operation *) val send_msg_all : ?block:bool -> 'a t -> Msg.t list -> unit + (** Send a multi-part message to the socket. + @param block indicates if the call should be blocking or non-blocking. + Defaults to [true]. + The function returns a [resumable] to allow resuming the operation in case of EINTR or EGAGIN. + *) + val send_msg_all_r : ?block:bool -> 'a t -> Msg.t list -> unit resumable + (** Option Getter and Setters *) (** Set the maximum message size of a message sent in this context, diff --git a/zmq/test/zmq_test.ml b/zmq/test/zmq_test.ml index c82038b..57a2afe 100644 --- a/zmq/test/zmq_test.ml +++ b/zmq/test/zmq_test.ml @@ -311,11 +311,18 @@ let suite = let endpoint = "inproc://endpoint" in bind rep endpoint; connect req endpoint; - send_all req ["request"; "and more"]; - let msg = recv_all rep in + let send = send_all_r req ["request"; "and more"] in + send (); + + + let recv = recv_all_r rep in + let msg = recv () in assert_equal ["request"; "and more"] msg; - send_all rep ["reply"; "and more"]; - let msg = recv_all req in + let send = send_all_r rep ["reply"; "and more"] in + send (); + let recv = recv_all_r req in + let msg = recv () in + assert_equal ["reply"; "and more"] msg ) (fun (ctx, req, rep) -> From 8b1ee7600624cd35f83be2141af730dde2142735 Mon Sep 17 00:00:00 2001 From: Anders Fugmann Date: Thu, 1 May 2025 22:50:36 +0200 Subject: [PATCH 3/8] Handle _all functions correctly --- zmq-deferred/src/socket.ml | 41 +++++++++++++------------------------- 1 file changed, 14 insertions(+), 27 deletions(-) diff --git a/zmq-deferred/src/socket.ml b/zmq-deferred/src/socket.ml index 0d34713..a9d5e2d 100644 --- a/zmq-deferred/src/socket.ml +++ b/zmq-deferred/src/socket.ml @@ -159,11 +159,11 @@ module Make(T: Deferred.T) = struct t type op = Send | Receive - let post: _ t -> op -> (_ Zmq.Socket.t -> 'a) -> 'a Deferred.t = fun t op f -> - let f' mailbox () = - let res = match f t.socket with + let post: _ t -> op -> (_ Zmq.Socket.t -> unit -> 'a) -> 'a Deferred.t = fun t op f -> + let f' f mailbox () = + let res = match f () with | v -> Ok v - | exception Unix.Unix_error (Unix.EAGAIN, _, _) -> + | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EINTR), _, _) -> (* Signal try again *) raise Retry | exception exn -> Error exn @@ -176,7 +176,7 @@ module Make(T: Deferred.T) = struct in let mailbox = Mailbox.create () in let should_signal = Queue.is_empty queue in - Queue.push (f' mailbox) queue; + Queue.push (f' (f t.socket) mailbox) queue; (* Wakeup the thread if the queue was empty *) begin @@ -191,38 +191,25 @@ module Make(T: Deferred.T) = struct let to_socket t = t.socket - let recv s = post s Receive (fun s -> Zmq.Socket.recv ~block:false s) - let send s m = post s Send (fun s -> Zmq.Socket.send ~block:false s m) + let recv s = post s Receive (fun s () -> Zmq.Socket.recv ~block:false s) + let send s m = post s Send (fun s () -> Zmq.Socket.send ~block:false s m) - let recv_msg s = post s Receive (fun s -> Zmq.Socket.recv_msg ~block:false s) + let recv_msg s = post s Receive (fun s () -> Zmq.Socket.recv_msg ~block:false s) let send_msg s m = - post s Send (fun s -> Zmq.Socket.send_msg ~block:false s m) + post s Send (fun s () -> Zmq.Socket.send_msg ~block:false s m) (** Recevie all message blocks. *) let recv_all s = - (* The documentaton says that either all message parts are - transmitted, or none. So once a message becomes available, all - parts can be read wothout blocking. - - Also receiving a multipart message must not be interleaved with - another receving thread on the same socket. - - We could have a read-mutex and a write mutex in order to limit - potential starvation of other threads while reading large - multipart messages. - - *) - post s Receive (fun s -> Zmq.Socket.recv_all ~block:false s) + post s Receive (fun s -> Zmq.Socket.recv_all_r ~block:false s) let send_all s parts = - (* See the comment in recv_all. *) - post s Send (fun s -> Zmq.Socket.send_all ~block:false s parts) + post s Send (fun s -> Zmq.Socket.send_all_r ~block:false s parts) let recv_msg_all s = - post s Receive (fun s -> Zmq.Socket.recv_msg_all ~block:false s) + post s Receive (fun s -> Zmq.Socket.recv_msg_all_r ~block:false s) let send_msg_all s parts = - post s Send (fun s -> Zmq.Socket.send_msg_all ~block:false s parts) + post s Send (fun s -> Zmq.Socket.send_msg_all_r ~block:false s parts) let close t = t.closing <- true; @@ -247,7 +234,7 @@ module Make(T: Deferred.T) = struct end module Monitor = struct - let recv s = post s Receive (fun s -> Zmq.Monitor.recv ~block:false s) + let recv s = post s Receive (fun s () -> Zmq.Monitor.recv ~block:false s) end end From 50e8af55f90aa81ac929957fb078ba0c5e69ef45 Mon Sep 17 00:00:00 2001 From: Anders Fugmann Date: Thu, 1 May 2025 23:14:17 +0200 Subject: [PATCH 4/8] Create resumable version of Monitor.recv --- zmq-deferred/src/socket.ml | 2 +- zmq-eio/src/socket.ml | 15 +++++++-------- zmq/src/zmq.ml | 19 ++++++++++++++++++- zmq/src/zmq.mli | 17 ++++++++++++----- 4 files changed, 38 insertions(+), 15 deletions(-) diff --git a/zmq-deferred/src/socket.ml b/zmq-deferred/src/socket.ml index a9d5e2d..9bedf8b 100644 --- a/zmq-deferred/src/socket.ml +++ b/zmq-deferred/src/socket.ml @@ -234,7 +234,7 @@ module Make(T: Deferred.T) = struct end module Monitor = struct - let recv s = post s Receive (fun s () -> Zmq.Monitor.recv ~block:false s) + let recv s = post s Receive (fun s -> Zmq.Monitor.recv_r ~block:false s) end end diff --git a/zmq-eio/src/socket.ml b/zmq-eio/src/socket.ml index a831745..be0874c 100644 --- a/zmq-eio/src/socket.ml +++ b/zmq-eio/src/socket.ml @@ -21,9 +21,8 @@ let process queue = | () -> let (_: unit -> unit) = Queue.pop queue in () - | exception Unix.Unix_error (Unix.EAGAIN, _, _) -> - (* If f raised EAGAIN, dont pop the message. *) - (* This should never happen. If so, the queue could be replaced with a Eio.Stream for faster handling *) + | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EINTR), _, _) -> + (* Leave the function on the queue to be retried *) () let with_lock lock f = @@ -123,10 +122,10 @@ let send_msg t message = request t t.senders (fun () -> Zmq.Socket.send_msg ~block:false t.socket message) let send_all t messages = - request t t.senders (fun () -> Zmq.Socket.send_all ~block:false t.socket messages) + request t t.senders (Zmq.Socket.send_all_r ~block:false t.socket messages) let send_msg_all t messages = - request t t.senders (fun () -> Zmq.Socket.send_msg_all ~block:false t.socket messages) + request t t.senders (Zmq.Socket.send_msg_all_r ~block:false t.socket messages) let recv t = request t t.receivers (fun () -> Zmq.Socket.recv ~block:false t.socket) @@ -135,10 +134,10 @@ let recv_msg t = request t t.receivers (fun () -> Zmq.Socket.recv_msg ~block:false t.socket) let recv_all t = - request t t.receivers (fun () -> Zmq.Socket.recv_all ~block:false t.socket) + request t t.receivers (Zmq.Socket.recv_all_r ~block:false t.socket) let recv_msg_all t = - request t t.receivers (fun () -> Zmq.Socket.recv_msg_all ~block:false t.socket) + request t t.receivers (Zmq.Socket.recv_msg_all_r ~block:false t.socket) module Router = struct type id_t = string @@ -155,5 +154,5 @@ module Router = struct end module Monitor = struct - let recv t = request t t.receivers (fun () -> Zmq.Monitor.recv ~block:false t.socket) + let recv t = request t t.receivers (Zmq.Monitor.recv_r ~block:false t.socket) end diff --git a/zmq/src/zmq.ml b/zmq/src/zmq.ml index d590552..974c896 100644 --- a/zmq/src/zmq.ml +++ b/zmq/src/zmq.ml @@ -13,6 +13,9 @@ exception ZMQ_exception of error * string external version : unit -> int * int * int = "caml_zmq_version" +type 'a resumable = unit -> 'a + + module Context = struct type t @@ -492,7 +495,6 @@ module Socket = struct type event = No_event | Poll_in | Poll_out | Poll_in_out | Poll_error external events : 'a t -> event = "caml_zmq_get_events" - type 'a resumable = unit -> 'a (** Allow resuming receive of a multipart message. The function returns a function that can be "resumed" in case of EGAGIN or EINTR. @@ -528,6 +530,7 @@ module Socket = struct in cont f ?block socket + (** This function should never be used. It does not allow resuming if a signal is received, which may leave half read multi part messages on the socket. *) let recv_all_wrapper (f : ?block:bool -> _ t -> _) = (* Once the first message part is received all remaining message parts can be received without blocking. *) @@ -683,6 +686,20 @@ module Monitor = struct let addr = Socket.recv ~block:false socket in decode_monitor_event event addr + let recv_r ?block socket = + let event = ref None in + let rec cont () = + match !event with + | None -> + event := Some (Socket.recv ?block socket); + cont () + | Some event -> + assert (Socket.has_more socket); + let addr = Socket.recv ~block:false socket in + decode_monitor_event event addr + in + cont + let get_peer_address fd = try let sockaddr = Unix.getpeername fd in diff --git a/zmq/src/zmq.mli b/zmq/src/zmq.mli index c73ca1a..cab82bf 100644 --- a/zmq/src/zmq.mli +++ b/zmq/src/zmq.mli @@ -10,6 +10,10 @@ type error = exception ZMQ_exception of error * string +(** Resumable. + Allows repeated calls until the operation has completed. Usefull to repeat calls if EINTR or EGAGIN is raised *) +type 'a resumable = unit -> 'a + val version : unit -> int * int * int module Context : sig @@ -95,10 +99,6 @@ module Socket : sig val bind : 'a t -> string -> unit val unbind : 'a t -> string -> unit - (** Resumable. - Allows repeated calls until the operation has completed. Usefull to repeat calls if EINTR or EGAGIN is raised *) - type 'a resumable = unit -> 'a - (** Read a message from the socket. block indicates if the call should be blocking or non-blocking. If block is [false], [recv] will raise [Unix.Unix_error (Unix.EAGAIN, _, _)] if there are no messages available to receive on the specified socket. @@ -345,10 +345,17 @@ module Monitor : sig val connect: Context.t -> t -> [<`Monitor] Socket.t (** Receive an event from the monitor socket. - block indicates if the call should be blocking or non-blocking. Default true + @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. + @deprecated Use [recv_r] to allow resuming the operation *) val recv: ?block:bool -> [< `Monitor ] Socket.t -> event + (** Receive an event from the monitor socket. + The function returns a [resumable] to allow resuming the operation in case of EINTR or EGAGIN. + @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. + *) + val recv_r: ?block:bool -> [< `Monitor ] Socket.t -> event resumable + val string_of_event: event -> string (** Create a memorizing function for converting an event to a string. From 8a78e223774197b797d3ae64c6ed2119aa114093 Mon Sep 17 00:00:00 2001 From: Anders Fugmann Date: Thu, 1 May 2025 23:34:53 +0200 Subject: [PATCH 5/8] Update documentation --- zmq/src/zmq.mli | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/zmq/src/zmq.mli b/zmq/src/zmq.mli index cab82bf..2cc7e37 100644 --- a/zmq/src/zmq.mli +++ b/zmq/src/zmq.mli @@ -11,7 +11,19 @@ type error = exception ZMQ_exception of error * string (** Resumable. - Allows repeated calls until the operation has completed. Usefull to repeat calls if EINTR or EGAGIN is raised *) + Allows repeated calls until the operation has + completed. + This is needed when reading or writing multipart messages. If EINTR is not + handled corretly, the socket may be left with a half written/read + message which will break subsequent operations. + + For this reason, if EINTR is raised its important to repeat the + call to the resumable to avoid the the socket go into a broken + state. + + For all [resumable]s the creation of the resumable does not send or receive any messages - that happend only when the resumable is evaluated. + +*) type 'a resumable = unit -> 'a val version : unit -> int * int * int @@ -108,13 +120,13 @@ module Socket : sig (** Read a complete multipart message from the socket. @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. - @deprecated Use [recv_all_r] to allow resuming the operation + @deprecated This function is unsafe wtr EINTR signals. Use {!recv_all_r} to allow resuming the operation *) val recv_all : ?block:bool -> 'a t -> string list (** Read a complete multipart message from the socket. @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. - The function returns a [resumable] to allow resuming the operation in case of EINTR or EGAGIN. + @return {!resumable} to allow resuming the operation in case of EINTR or EGAGIN. *) val recv_all_r : ?block:bool -> 'a t -> string list resumable @@ -128,13 +140,13 @@ module Socket : sig (** Send a multipart message to the socket. @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. - @deprecated Use [send_all_r] to allow resuming the operation + @deprecated This function is unsafe wtr EINTR signals. Use {!send_all_r} to allow resuming the operation *) val send_all : ?block:bool -> 'a t -> string list -> unit (** Send a multipart message to the socket. @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. - The function returns a [resumable] to allow resuming the operation in case of EINTR or EGAGIN. + @return {!resumable} to allow resuming the operation in case of EINTR or EGAGIN. *) val send_all_r : ?block:bool -> 'a t -> string list -> unit resumable @@ -147,13 +159,13 @@ module Socket : sig (** Receive a multi-part message on the socket. @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. - @deprecated Use [recv_msg_all_r] to allow resuming the operation + @deprecated This function is unsafe wtr EINTR signals. Use {!recv_msg_all_r} to allow resuming the operation *) val recv_msg_all : ?block:bool -> 'a t -> Msg.t list (** Receive a multi-part message on the socket. @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. - The function returns a [resumable] to allow resuming the operation in case of EINTR or EGAGIN. + @return {!resumable} to allow resuming the operation in case of EINTR or EGAGIN. *) val recv_msg_all_r : ?block:bool -> 'a t -> Msg.t list resumable @@ -168,14 +180,14 @@ module Socket : sig (** Send a multi-part message to the socket. @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. - @deprecated Use [send_msg_all_r] to allow resuming the operation + @deprecated This function is unsafe wtr EINTR signals. Use {!send_msg_all_r} to allow resuming the operation *) val send_msg_all : ?block:bool -> 'a t -> Msg.t list -> unit (** Send a multi-part message to the socket. @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. - The function returns a [resumable] to allow resuming the operation in case of EINTR or EGAGIN. + @return {!resumable} to allow resuming the operation in case of EINTR or EGAGIN. *) val send_msg_all_r : ?block:bool -> 'a t -> Msg.t list -> unit resumable @@ -346,13 +358,13 @@ module Monitor : sig (** Receive an event from the monitor socket. @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. - @deprecated Use [recv_r] to allow resuming the operation + @deprecated This function is unsafe wtr EINTR signals. Use {!recv_r} to allow resuming the operation *) val recv: ?block:bool -> [< `Monitor ] Socket.t -> event (** Receive an event from the monitor socket. - The function returns a [resumable] to allow resuming the operation in case of EINTR or EGAGIN. @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. + @return {!resumable} to allow resuming the operation in case of EINTR or EGAGIN. *) val recv_r: ?block:bool -> [< `Monitor ] Socket.t -> event resumable From 8b1ebb78255c0ccbfb8489bde5d0ca6fa820bd15 Mon Sep 17 00:00:00 2001 From: Anders Fugmann Date: Thu, 1 May 2025 23:52:42 +0200 Subject: [PATCH 6/8] Add test for EAGAIN --- zmq/test/zmq_test.ml | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/zmq/test/zmq_test.ml b/zmq/test/zmq_test.ml index 57a2afe..89679d6 100644 --- a/zmq/test/zmq_test.ml +++ b/zmq/test/zmq_test.ml @@ -216,6 +216,23 @@ let test_unix_exceptions = bracket Zmq.Context.terminate ctx ) +(** Simple test to test eagain is raised when reading nonblocking from an empty queue *) +let test_zmq_eagain = bracket + (fun () -> + let ctx = Zmq.Context.create () in + let s = Zmq.Socket.create ctx pull in + (ctx, s) + ) + (fun (_, s) -> + assert_raises ~msg:"Failed to raise EAGAIN" Unix.(Unix_error(EAGAIN, "zmq_msg_recv", "")) (fun _ -> Zmq.Socket.recv ~block:false s); + () + ) + (fun (ctx, s) -> + Zmq.Socket.close s; + Zmq.Context.terminate ctx + ) + + (** Test a Zmq specific exception *) let test_zmq_exception = bracket (fun () -> @@ -233,6 +250,8 @@ let test_zmq_exception = bracket Zmq.Context.terminate ctx; ) + + let test_socket_gc () = let sock = let ctx = Zmq.Context.create () in @@ -369,7 +388,8 @@ let suite = "monitor" >:: test_monitor; "z85 encoding/decoding" >:: test_z85; "unix exceptions" >:: test_unix_exceptions; - "zmq exceptions" >:: test_zmq_exception; + "zmq exception intr" >:: test_zmq_exception; + "zmq exception eagain" >:: test_zmq_eagain; (* Gc tests disabled, as resources will not be freed through finalisers "socket gc" >:: test_socket_gc; "context gc" >:: test_context_gc; From 9c5332c146053fe6f0407e3630e70fc20272a2f5 Mon Sep 17 00:00:00 2001 From: Anders Fugmann Date: Thu, 1 May 2025 23:53:20 +0200 Subject: [PATCH 7/8] update changelog --- CHANGES.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 070f3eb..67ebb1c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,17 @@ +6.0.0 Unreleased. +--- +* Solve problem with EINTR. + Add `send_all_r`, `send_all_msg_r`, `recv_all_r` and `recv_all_msg_r` + to allow resuming if the calls raise EAGIN or EINTR. + +* Deprecate `send_all`, `send_all_msg`, `recv_all` and `recv_all_msg` + as resumable functions should be used instead. + +* Fix deprecation warning on use of Async_kernel.Ivar.fill + +* lwt and async now retry on EINTR and EAGAIN and handles + recv|send_*_all function correctly. + 5.3.0 --- * Add eio binding in zmq-eio (#126, @andersfugmann) From 77c3ef05b9434434d45d29001398542f1195eb82 Mon Sep 17 00:00:00 2001 From: Anders Peter Fugmann Date: Fri, 2 May 2025 00:42:46 +0200 Subject: [PATCH 8/8] Update zmq/src/zmq.mli Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- zmq/src/zmq.mli | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zmq/src/zmq.mli b/zmq/src/zmq.mli index 2cc7e37..b6619c0 100644 --- a/zmq/src/zmq.mli +++ b/zmq/src/zmq.mli @@ -126,7 +126,7 @@ module Socket : sig (** Read a complete multipart message from the socket. @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. - @return {!resumable} to allow resuming the operation in case of EINTR or EGAGIN. + @return {!resumable} to allow resuming the operation in case of EINTR or EAGAIN. *) val recv_all_r : ?block:bool -> 'a t -> string list resumable