diff --git a/CHANGES.md b/CHANGES.md index 070f3eb..67ebb1c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,17 @@ +6.0.0 Unreleased. +--- +* Solve problem with EINTR. + Add `send_all_r`, `send_all_msg_r`, `recv_all_r` and `recv_all_msg_r` + to allow resuming if the calls raise EAGIN or EINTR. + +* Deprecate `send_all`, `send_all_msg`, `recv_all` and `recv_all_msg` + as resumable functions should be used instead. + +* Fix deprecation warning on use of Async_kernel.Ivar.fill + +* lwt and async now retry on EINTR and EAGAIN and handles + recv|send_*_all function correctly. + 5.3.0 --- * Add eio binding in zmq-eio (#126, @andersfugmann) diff --git a/zmq-async/src/deferred.ml b/zmq-async/src/deferred.ml index fb46f10..b9cb1de 100644 --- a/zmq-async/src/deferred.ml +++ b/zmq-async/src/deferred.ml @@ -26,7 +26,7 @@ end module Mailbox = struct type 'a t = 'a Async_kernel.Ivar.t let create () = Async_kernel.Ivar.create () - let send t v = Async_kernel.Ivar.fill t v + let send t v = Async_kernel.Ivar.fill_exn t v let recv t = Async_kernel.Ivar.read t end diff --git a/zmq-deferred/src/socket.ml b/zmq-deferred/src/socket.ml index 0d34713..9bedf8b 100644 --- a/zmq-deferred/src/socket.ml +++ b/zmq-deferred/src/socket.ml @@ -159,11 +159,11 @@ module Make(T: Deferred.T) = struct t type op = Send | Receive - let post: _ t -> op -> (_ Zmq.Socket.t -> 'a) -> 'a Deferred.t = fun t op f -> - let f' mailbox () = - let res = match f t.socket with + let post: _ t -> op -> (_ Zmq.Socket.t -> unit -> 'a) -> 'a Deferred.t = fun t op f -> + let f' f mailbox () = + let res = match f () with | v -> Ok v - | exception Unix.Unix_error (Unix.EAGAIN, _, _) -> + | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EINTR), _, _) -> (* Signal try again *) raise Retry | exception exn -> Error exn @@ -176,7 +176,7 @@ module Make(T: Deferred.T) = struct in let mailbox = Mailbox.create () in let should_signal = Queue.is_empty queue in - Queue.push (f' mailbox) queue; + Queue.push (f' (f t.socket) mailbox) queue; (* Wakeup the thread if the queue was empty *) begin @@ -191,38 +191,25 @@ module Make(T: Deferred.T) = struct let to_socket t = t.socket - let recv s = post s Receive (fun s -> Zmq.Socket.recv ~block:false s) - let send s m = post s Send (fun s -> Zmq.Socket.send ~block:false s m) + let recv s = post s Receive (fun s () -> Zmq.Socket.recv ~block:false s) + let send s m = post s Send (fun s () -> Zmq.Socket.send ~block:false s m) - let recv_msg s = post s Receive (fun s -> Zmq.Socket.recv_msg ~block:false s) + let recv_msg s = post s Receive (fun s () -> Zmq.Socket.recv_msg ~block:false s) let send_msg s m = - post s Send (fun s -> Zmq.Socket.send_msg ~block:false s m) + post s Send (fun s () -> Zmq.Socket.send_msg ~block:false s m) (** Recevie all message blocks. *) let recv_all s = - (* The documentaton says that either all message parts are - transmitted, or none. So once a message becomes available, all - parts can be read wothout blocking. - - Also receiving a multipart message must not be interleaved with - another receving thread on the same socket. - - We could have a read-mutex and a write mutex in order to limit - potential starvation of other threads while reading large - multipart messages. - - *) - post s Receive (fun s -> Zmq.Socket.recv_all ~block:false s) + post s Receive (fun s -> Zmq.Socket.recv_all_r ~block:false s) let send_all s parts = - (* See the comment in recv_all. *) - post s Send (fun s -> Zmq.Socket.send_all ~block:false s parts) + post s Send (fun s -> Zmq.Socket.send_all_r ~block:false s parts) let recv_msg_all s = - post s Receive (fun s -> Zmq.Socket.recv_msg_all ~block:false s) + post s Receive (fun s -> Zmq.Socket.recv_msg_all_r ~block:false s) let send_msg_all s parts = - post s Send (fun s -> Zmq.Socket.send_msg_all ~block:false s parts) + post s Send (fun s -> Zmq.Socket.send_msg_all_r ~block:false s parts) let close t = t.closing <- true; @@ -247,7 +234,7 @@ module Make(T: Deferred.T) = struct end module Monitor = struct - let recv s = post s Receive (fun s -> Zmq.Monitor.recv ~block:false s) + let recv s = post s Receive (fun s -> Zmq.Monitor.recv_r ~block:false s) end end diff --git a/zmq-eio/src/socket.ml b/zmq-eio/src/socket.ml index a831745..be0874c 100644 --- a/zmq-eio/src/socket.ml +++ b/zmq-eio/src/socket.ml @@ -21,9 +21,8 @@ let process queue = | () -> let (_: unit -> unit) = Queue.pop queue in () - | exception Unix.Unix_error (Unix.EAGAIN, _, _) -> - (* If f raised EAGAIN, dont pop the message. *) - (* This should never happen. If so, the queue could be replaced with a Eio.Stream for faster handling *) + | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EINTR), _, _) -> + (* Leave the function on the queue to be retried *) () let with_lock lock f = @@ -123,10 +122,10 @@ let send_msg t message = request t t.senders (fun () -> Zmq.Socket.send_msg ~block:false t.socket message) let send_all t messages = - request t t.senders (fun () -> Zmq.Socket.send_all ~block:false t.socket messages) + request t t.senders (Zmq.Socket.send_all_r ~block:false t.socket messages) let send_msg_all t messages = - request t t.senders (fun () -> Zmq.Socket.send_msg_all ~block:false t.socket messages) + request t t.senders (Zmq.Socket.send_msg_all_r ~block:false t.socket messages) let recv t = request t t.receivers (fun () -> Zmq.Socket.recv ~block:false t.socket) @@ -135,10 +134,10 @@ let recv_msg t = request t t.receivers (fun () -> Zmq.Socket.recv_msg ~block:false t.socket) let recv_all t = - request t t.receivers (fun () -> Zmq.Socket.recv_all ~block:false t.socket) + request t t.receivers (Zmq.Socket.recv_all_r ~block:false t.socket) let recv_msg_all t = - request t t.receivers (fun () -> Zmq.Socket.recv_msg_all ~block:false t.socket) + request t t.receivers (Zmq.Socket.recv_msg_all_r ~block:false t.socket) module Router = struct type id_t = string @@ -155,5 +154,5 @@ module Router = struct end module Monitor = struct - let recv t = request t t.receivers (fun () -> Zmq.Monitor.recv ~block:false t.socket) + let recv t = request t t.receivers (Zmq.Monitor.recv_r ~block:false t.socket) end diff --git a/zmq/src/zmq.ml b/zmq/src/zmq.ml index 5b5f3bf..974c896 100644 --- a/zmq/src/zmq.ml +++ b/zmq/src/zmq.ml @@ -13,6 +13,9 @@ exception ZMQ_exception of error * string external version : unit -> int * int * int = "caml_zmq_version" +type 'a resumable = unit -> 'a + + module Context = struct type t @@ -492,6 +495,42 @@ module Socket = struct type event = No_event | Poll_in | Poll_out | Poll_in_out | Poll_error external events : 'a t -> event = "caml_zmq_get_events" + + (** Allow resuming receive of a multipart message. + The function returns a function that can be "resumed" in case of EGAGIN or EINTR. + *) + let recv_all_wrapper_resumable: (?block:bool -> _ t -> 'a) -> ?block:bool -> _ t -> 'a list resumable = fun f ?block socket -> + let received = ref [] in + let rec cont f ?block () = + let message = f ?block socket in + received := message :: !received; + match has_more socket with + | true -> + cont f ~block:false () + | false -> + (* Convert the queue to a list *) + List.rev !received + in + cont f ?block + + (** Allow resuming send of a multipart message. + The function returns a function that can be "resumed" in case of EGAGIN or EINTR. + *) + let send_all_wrapper_resumable: (?block:bool -> ?more:bool -> _ t -> 'a -> unit) -> ?block:bool -> _ t -> 'a list -> unit resumable = fun f ?block socket messages -> + let messages = ref messages in + let rec cont (f: (?block:bool -> ?more:bool -> _ t -> 'a -> unit)) ?block socket () = + match !messages with + | [] -> () + | [ msg ] -> + f ?block ~more:false socket msg + | msg :: msgs -> + f ?block ~more:true socket msg; + messages := msgs; + cont f ~block:true socket () + in + cont f ?block socket + + (** This function should never be used. It does not allow resuming if a signal is received, which may leave half read multi part messages on the socket. *) let recv_all_wrapper (f : ?block:bool -> _ t -> _) = (* Once the first message part is received all remaining message parts can be received without blocking. *) @@ -529,14 +568,26 @@ module Socket = struct let recv_all ?block socket = recv_all_wrapper recv ?block socket + let recv_all_r ?block socket = + recv_all_wrapper_resumable recv ?block socket + let send_all ?block socket message = send_all_wrapper send ?block socket message + let send_all_r ?block socket message = + send_all_wrapper_resumable send ?block socket message + let recv_msg_all ?block socket = recv_all_wrapper recv_msg ?block socket + let recv_msg_all_r ?block socket = + recv_all_wrapper_resumable recv_msg ?block socket + let send_msg_all ?block socket message = send_all_wrapper send_msg ?block socket message + + let send_msg_all_r ?block socket message = + send_all_wrapper_resumable send_msg ?block socket message end module Proxy = struct @@ -635,6 +686,20 @@ module Monitor = struct let addr = Socket.recv ~block:false socket in decode_monitor_event event addr + let recv_r ?block socket = + let event = ref None in + let rec cont () = + match !event with + | None -> + event := Some (Socket.recv ?block socket); + cont () + | Some event -> + assert (Socket.has_more socket); + let addr = Socket.recv ~block:false socket in + decode_monitor_event event addr + in + cont + let get_peer_address fd = try let sockaddr = Unix.getpeername fd in diff --git a/zmq/src/zmq.mli b/zmq/src/zmq.mli index 68e7cbc..b6619c0 100644 --- a/zmq/src/zmq.mli +++ b/zmq/src/zmq.mli @@ -10,6 +10,22 @@ type error = exception ZMQ_exception of error * string +(** Resumable. + Allows repeated calls until the operation has + completed. + This is needed when reading or writing multipart messages. If EINTR is not + handled corretly, the socket may be left with a half written/read + message which will break subsequent operations. + + For this reason, if EINTR is raised its important to repeat the + call to the resumable to avoid the the socket go into a broken + state. + + For all [resumable]s the creation of the resumable does not send or receive any messages - that happend only when the resumable is evaluated. + +*) +type 'a resumable = unit -> 'a + val version : unit -> int * int * int module Context : sig @@ -103,51 +119,78 @@ module Socket : sig val recv : ?block:bool -> 'a t -> string (** Read a complete multipart message from the socket. - block indicates if the call should be blocking or non-blocking. Default true + @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. + @deprecated This function is unsafe wtr EINTR signals. Use {!recv_all_r} to allow resuming the operation *) val recv_all : ?block:bool -> 'a t -> string list + (** Read a complete multipart message from the socket. + @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. + @return {!resumable} to allow resuming the operation in case of EINTR or EAGAIN. + *) + val recv_all_r : ?block:bool -> 'a t -> string list resumable + (** Send a message to the socket. - block indicates if the call should be blocking or non-blocking. Default true - more is used for multipart messages, and indicates that the more message parts will follow. Default false + @param block indicates if the call should be blocking or non-blocking. + Defaults to [true]. + @param more indicate that more messages will follow and will be joined into a multipart message. + Defaults to [true]. *) val send : ?block:bool -> ?more:bool -> 'a t -> string -> unit (** Send a multipart message to the socket. - block indicates if the call should be blocking or non-blocking. Default true + @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. + @deprecated This function is unsafe wtr EINTR signals. Use {!send_all_r} to allow resuming the operation *) val send_all : ?block:bool -> 'a t -> string list -> unit - (** Receive a {!Msg.t} on the socket. + (** Send a multipart message to the socket. + @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. + @return {!resumable} to allow resuming the operation in case of EINTR or EGAGIN. + *) + val send_all_r : ?block:bool -> 'a t -> string list -> unit resumable + + (** Receive a {!Msg.t} on the socket. @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. *) val recv_msg : ?block:bool -> 'a t -> Msg.t (** Receive a multi-part message on the socket. - - @param block indicates if the call should be blocking or non-blocking. - Defaults to [true]. + @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. + @deprecated This function is unsafe wtr EINTR signals. Use {!recv_msg_all_r} to allow resuming the operation *) val recv_msg_all : ?block:bool -> 'a t -> Msg.t list - (** Send a {!Msg.t} to the socket. + (** Receive a multi-part message on the socket. + @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. + @return {!resumable} to allow resuming the operation in case of EINTR or EGAGIN. + *) + val recv_msg_all_r : ?block:bool -> 'a t -> Msg.t list resumable +(** Send a {!Msg.t} to the socket. @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. @param more is used for multipart messages Set to [true] to indicate that - more message parts will follow. Defaults to [false]. + more message parts will follow. Defaults to [false]. *) val send_msg : ?block:bool -> ?more:bool -> 'a t -> Msg.t -> unit (** Send a multi-part message to the socket. - @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. + @deprecated This function is unsafe wtr EINTR signals. Use {!send_msg_all_r} to allow resuming the operation *) val send_msg_all : ?block:bool -> 'a t -> Msg.t list -> unit + (** Send a multi-part message to the socket. + @param block indicates if the call should be blocking or non-blocking. + Defaults to [true]. + @return {!resumable} to allow resuming the operation in case of EINTR or EGAGIN. + *) + val send_msg_all_r : ?block:bool -> 'a t -> Msg.t list -> unit resumable + (** Option Getter and Setters *) (** Set the maximum message size of a message sent in this context, @@ -314,10 +357,17 @@ module Monitor : sig val connect: Context.t -> t -> [<`Monitor] Socket.t (** Receive an event from the monitor socket. - block indicates if the call should be blocking or non-blocking. Default true + @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. + @deprecated This function is unsafe wtr EINTR signals. Use {!recv_r} to allow resuming the operation *) val recv: ?block:bool -> [< `Monitor ] Socket.t -> event + (** Receive an event from the monitor socket. + @param block indicates if the call should be blocking or non-blocking. Defaults to [true]. + @return {!resumable} to allow resuming the operation in case of EINTR or EGAGIN. + *) + val recv_r: ?block:bool -> [< `Monitor ] Socket.t -> event resumable + val string_of_event: event -> string (** Create a memorizing function for converting an event to a string. diff --git a/zmq/test/zmq_test.ml b/zmq/test/zmq_test.ml index c82038b..89679d6 100644 --- a/zmq/test/zmq_test.ml +++ b/zmq/test/zmq_test.ml @@ -216,6 +216,23 @@ let test_unix_exceptions = bracket Zmq.Context.terminate ctx ) +(** Simple test to test eagain is raised when reading nonblocking from an empty queue *) +let test_zmq_eagain = bracket + (fun () -> + let ctx = Zmq.Context.create () in + let s = Zmq.Socket.create ctx pull in + (ctx, s) + ) + (fun (_, s) -> + assert_raises ~msg:"Failed to raise EAGAIN" Unix.(Unix_error(EAGAIN, "zmq_msg_recv", "")) (fun _ -> Zmq.Socket.recv ~block:false s); + () + ) + (fun (ctx, s) -> + Zmq.Socket.close s; + Zmq.Context.terminate ctx + ) + + (** Test a Zmq specific exception *) let test_zmq_exception = bracket (fun () -> @@ -233,6 +250,8 @@ let test_zmq_exception = bracket Zmq.Context.terminate ctx; ) + + let test_socket_gc () = let sock = let ctx = Zmq.Context.create () in @@ -311,11 +330,18 @@ let suite = let endpoint = "inproc://endpoint" in bind rep endpoint; connect req endpoint; - send_all req ["request"; "and more"]; - let msg = recv_all rep in + let send = send_all_r req ["request"; "and more"] in + send (); + + + let recv = recv_all_r rep in + let msg = recv () in assert_equal ["request"; "and more"] msg; - send_all rep ["reply"; "and more"]; - let msg = recv_all req in + let send = send_all_r rep ["reply"; "and more"] in + send (); + let recv = recv_all_r req in + let msg = recv () in + assert_equal ["reply"; "and more"] msg ) (fun (ctx, req, rep) -> @@ -362,7 +388,8 @@ let suite = "monitor" >:: test_monitor; "z85 encoding/decoding" >:: test_z85; "unix exceptions" >:: test_unix_exceptions; - "zmq exceptions" >:: test_zmq_exception; + "zmq exception intr" >:: test_zmq_exception; + "zmq exception eagain" >:: test_zmq_eagain; (* Gc tests disabled, as resources will not be freed through finalisers "socket gc" >:: test_socket_gc; "context gc" >:: test_context_gc;