diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index df2d7564f555..bc99a83ee37a 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -127,7 +127,7 @@ -record(link, {name :: link_name(), ref :: link_ref(), - state = detached :: detached | attach_sent | attached | detach_sent, + state = detached :: detached | attach_sent | attached | attach_refused | detach_sent, notify :: pid(), output_handle :: output_handle(), input_handle :: input_handle() | undefined, @@ -325,9 +325,11 @@ mapped(cast, #'v1_0.end'{} = End, State) -> ok = notify_session_ended(End, State), {stop, normal, State}; mapped(cast, #'v1_0.attach'{name = {utf8, Name}, - initial_delivery_count = IDC, handle = {uint, InHandle}, role = PeerRoleBool, + source = Source, + target = Target, + initial_delivery_count = IDC, max_message_size = MaybeMaxMessageSize} = Attach, #state{links = Links, link_index = LinkIndex, link_handle_index = LHI} = State0) -> @@ -339,20 +341,28 @@ mapped(cast, #'v1_0.attach'{name = {utf8, Name}, #{OutHandle := Link0} = Links, ok = notify_link_attached(Link0, Attach, State0), - {DeliveryCount, MaxMessageSize} = + {LinkState, DeliveryCount, MaxMessageSize} = case Link0 of #link{role = sender = OurRole, delivery_count = DC} -> + LS = case Target of + #'v1_0.target'{} -> attached; + _ -> attach_refused + end, MSS = case MaybeMaxMessageSize of {ulong, S} when S > 0 -> S; _ -> undefined end, - {DC, MSS}; + {LS, DC, MSS}; #link{role = receiver = OurRole, max_message_size = MSS} -> - {unpack(IDC), MSS} + LS = case Source of + #'v1_0.source'{} -> attached; + _ -> attach_refused + end, + {LS, unpack(IDC), MSS} end, - Link = Link0#link{state = attached, + Link = Link0#link{state = LinkState, input_handle = InHandle, delivery_count = DeliveryCount, max_message_size = MaxMessageSize}, @@ -496,43 +506,31 @@ mapped({call, From}, {keep_state_and_data, {reply, From, {error, remote_incoming_window_exceeded}}}; mapped({call, From = {Pid, _}}, {transfer, #'v1_0.transfer'{handle = {uint, OutHandle}, - delivery_tag = {binary, DeliveryTag}, - settled = false} = Transfer0, Sections}, - #state{outgoing_delivery_id = DeliveryId, links = Links, - outgoing_unsettled = Unsettled} = State) -> + delivery_tag = DeliveryTag, + settled = Settled} = Transfer0, Sections}, + #state{outgoing_delivery_id = DeliveryId, + links = Links, + outgoing_unsettled = Unsettled0} = State0) -> case Links of + #{OutHandle := #link{state = attach_refused}} -> + {keep_state_and_data, {reply, From, {error, attach_refused}}}; #{OutHandle := #link{input_handle = undefined}} -> {keep_state_and_data, {reply, From, {error, half_attached}}}; #{OutHandle := #link{link_credit = LC}} when LC =< 0 -> {keep_state_and_data, {reply, From, {error, insufficient_credit}}}; - #{OutHandle := Link = #link{max_message_size = MaxMessageSize, - footer_opt = FooterOpt}} -> + #{OutHandle := #link{max_message_size = MaxMessageSize, + footer_opt = FooterOpt} = Link} -> Transfer = Transfer0#'v1_0.transfer'{delivery_id = uint(DeliveryId)}, - case send_transfer(Transfer, Sections, FooterOpt, MaxMessageSize, State) of - {ok, NumFrames} -> - State1 = State#state{outgoing_unsettled = Unsettled#{DeliveryId => {DeliveryTag, Pid}}}, - {keep_state, book_transfer_send(NumFrames, Link, State1), {reply, From, ok}}; - Error -> - {keep_state_and_data, {reply, From, Error}} - end; - _ -> - {keep_state_and_data, {reply, From, {error, link_not_found}}} - - end; -mapped({call, From}, - {transfer, #'v1_0.transfer'{handle = {uint, OutHandle}} = Transfer0, - Sections}, #state{outgoing_delivery_id = DeliveryId, - links = Links} = State) -> - case Links of - #{OutHandle := #link{input_handle = undefined}} -> - {keep_state_and_data, {reply, From, {error, half_attached}}}; - #{OutHandle := #link{link_credit = LC}} when LC =< 0 -> - {keep_state_and_data, {reply, From, {error, insufficient_credit}}}; - #{OutHandle := Link = #link{max_message_size = MaxMessageSize, - footer_opt = FooterOpt}} -> - Transfer = Transfer0#'v1_0.transfer'{delivery_id = uint(DeliveryId)}, - case send_transfer(Transfer, Sections, FooterOpt, MaxMessageSize, State) of + case send_transfer(Transfer, Sections, FooterOpt, MaxMessageSize, State0) of {ok, NumFrames} -> + State = case Settled of + true -> + State0; + false -> + {binary, Tag} = DeliveryTag, + Unsettled = Unsettled0#{DeliveryId => {Tag, Pid}}, + State0#state{outgoing_unsettled = Unsettled} + end, {keep_state, book_transfer_send(NumFrames, Link, State), {reply, From, ok}}; Error -> {keep_state_and_data, {reply, From, Error}} @@ -688,21 +686,28 @@ send_flow_link(OutHandle, never -> never; _ -> {RenewWhenBelow, Credit} end, - #{OutHandle := #link{output_handle = H, + #{OutHandle := #link{state = LinkState, + output_handle = H, role = receiver, delivery_count = DeliveryCount, available = Available} = Link} = Links, - Flow1 = Flow0#'v1_0.flow'{ - handle = uint(H), - %% "In the event that the receiving link endpoint has not yet seen the - %% initial attach frame from the sender this field MUST NOT be set." [2.7.4] - delivery_count = maybe_uint(DeliveryCount), - available = uint(Available)}, - Flow = set_flow_session_fields(Flow1, State), - ok = send(Flow, State), - State#state{links = Links#{OutHandle => - Link#link{link_credit = Credit, - auto_flow = AutoFlow}}}. + case LinkState of + attach_refused -> + %% We will receive the DETACH frame shortly. + State; + _ -> + Flow1 = Flow0#'v1_0.flow'{ + handle = uint(H), + %% "In the event that the receiving link endpoint has not yet seen the + %% initial attach frame from the sender this field MUST NOT be set." [2.7.4] + delivery_count = maybe_uint(DeliveryCount), + available = uint(Available)}, + Flow = set_flow_session_fields(Flow1, State), + ok = send(Flow, State), + State#state{links = Links#{OutHandle => + Link#link{link_credit = Credit, + auto_flow = AutoFlow}}} + end. send_flow_session(State) -> Flow = set_flow_session_fields(#'v1_0.flow'{}, State), diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl index b922cca1b2db..4b7c7359b7d6 100644 --- a/deps/amqp10_client/test/system_SUITE.erl +++ b/deps/amqp10_client/test/system_SUITE.erl @@ -52,6 +52,7 @@ groups() -> ]}, {mock, [], [ insufficient_credit, + attach_refused, incoming_heartbeat, multi_transfer_without_delivery_id ]} @@ -772,11 +773,13 @@ insufficient_credit(Config) -> outgoing_window = {uint, 1000}} ]} end, - AttachStep = fun({0 = Ch, #'v1_0.attach'{role = false, - name = Name}, <<>>}) -> + AttachStep = fun({0 = Ch, #'v1_0.attach'{name = Name, + role = false, + target = Target}, <<>>}) -> {Ch, [#'v1_0.attach'{name = Name, handle = {uint, 99}, - role = true}]} + role = true, + target = Target}]} end, Steps = [fun mock_server:recv_amqp_header_step/1, fun mock_server:send_amqp_header_step/1, @@ -799,6 +802,52 @@ insufficient_credit(Config) -> ok = amqp10_client:close_connection(Connection), ok. +attach_refused(Config) -> + Hostname = ?config(mock_host, Config), + Port = ?config(mock_port, Config), + OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) -> + {Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]} + end, + BeginStep = fun({0 = Ch, #'v1_0.begin'{}, _Pay}) -> + {Ch, [#'v1_0.begin'{remote_channel = {ushort, Ch}, + next_outgoing_id = {uint, 1}, + incoming_window = {uint, 1000}, + outgoing_window = {uint, 1000}} + ]} + end, + AttachStep = fun({0 = Ch, #'v1_0.attach'{name = Name, + role = false}, <<>>}) -> + %% We test only the 1st stage of link refusal: + %% Server replies with its local terminus set to null. + %% We omit the 2nd stage (the detach frame). + {Ch, [#'v1_0.attach'{name = Name, + handle = {uint, 99}, + role = true, + target = undefined}]} + end, + Steps = [fun mock_server:recv_amqp_header_step/1, + fun mock_server:send_amqp_header_step/1, + mock_server:amqp_step(OpenStep), + mock_server:amqp_step(BeginStep), + mock_server:amqp_step(AttachStep)], + + ok = mock_server:set_steps(?config(mock_server, Config), Steps), + + Cfg = #{address => Hostname, port => Port, sasl => none, notify => self()}, + {ok, Connection} = amqp10_client:open_connection(Cfg), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"mock1-sender">>, + <<"test">>), + await_link(Sender, attached, attached_timeout), + Msg = amqp10_msg:new(<<"mock-tag">>, <<"banana">>, true), + %% We expect that the lib prevents the app from sending messages + %% in this intermediate link refusal state. + ?assertEqual({error, attach_refused}, + amqp10_client:send_msg(Sender, Msg)), + + ok = amqp10_client:end_session(Session), + ok = amqp10_client:close_connection(Connection). + multi_transfer_without_delivery_id(Config) -> Hostname = ?config(mock_host, Config), Port = ?config(mock_port, Config), diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 40baa3521c6f..02200450852d 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -967,7 +967,7 @@ handle_frame({Performative = #'v1_0.transfer'{handle = ?UINT(Handle)}, Paylaod}, {Reply, State} = case IncomingLinks of #{Handle := Link0} -> - case incoming_link_transfer(Performative, Paylaod, Link0, State1) of + try incoming_link_transfer(Performative, Paylaod, Link0, State1) of {ok, Reply0, Link, State2} -> {Reply0, State2#state{incoming_links = IncomingLinks#{Handle := Link}}}; {error, Reply0} -> @@ -975,6 +975,9 @@ handle_frame({Performative = #'v1_0.transfer'{handle = ?UINT(Handle)}, Paylaod}, %% with appropriate error information supplied in the error field of the %% detach frame. The link endpoint MUST then be destroyed." [2.6.5] {Reply0, State1#state{incoming_links = maps:remove(Handle, IncomingLinks)}} + catch {link_error, Error} -> + Detach = detach(Handle, Link0, Error), + {[Detach], State1#state{incoming_links = maps:remove(Handle, IncomingLinks)}} end; _ -> incoming_mgmt_link_transfer(Performative, Paylaod, State1) @@ -1110,16 +1113,46 @@ handle_frame(#'v1_0.disposition'{role = ?AMQP_ROLE_RECEIVER, reply_frames(Reply, State) end; -handle_frame(#'v1_0.attach'{handle = ?UINT(Handle)} = Attach, - #state{cfg = #cfg{max_handle = MaxHandle}} = State) -> - ok = validate_attach(Attach), - case Handle > MaxHandle of - true -> - protocol_error(?V_1_0_CONNECTION_ERROR_FRAMING_ERROR, - "link handle value (~b) exceeds maximum link handle value (~b)", - [Handle, MaxHandle]); - false -> - handle_attach(Attach, State) +handle_frame(#'v1_0.attach'{handle = ?UINT(Handle)}, + #state{cfg = #cfg{max_handle = MaxHandle}}) + when Handle > MaxHandle -> + protocol_error(?V_1_0_CONNECTION_ERROR_FRAMING_ERROR, + "link handle value (~b) exceeds maximum link handle value (~b)", + [Handle, MaxHandle]); +handle_frame(#'v1_0.attach'{name = {utf8, NameBin} = Name, + handle = Handle, + role = Role, + source = Source, + target = Target} = Attach, + State) -> + try + ok = validate_attach(Attach), + handle_attach(Attach, State) + catch + {link_error, Error} -> + %% Figure 2.33 + ?LOG_WARNING("refusing link '~ts': ~tp", [NameBin, Error]), + AttachReply = case Role of + ?AMQP_ROLE_SENDER -> + #'v1_0.attach'{ + name = Name, + handle = Handle, + role = ?AMQP_ROLE_RECEIVER, + source = Source, + target = null}; + ?AMQP_ROLE_RECEIVER -> + #'v1_0.attach'{ + name = Name, + handle = Handle, + role = ?AMQP_ROLE_SENDER, + source = null, + target = Target, + initial_delivery_count = ?UINT(?INITIAL_DELIVERY_COUNT)} + end, + Detach = #'v1_0.detach'{handle = Handle, + closed = true, + error = Error}, + {ok, [AttachReply, Detach], State} end; handle_frame(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)}, @@ -1226,9 +1259,9 @@ handle_attach(#'v1_0.attach'{ Pair#management_link_pair{incoming_half = HandleInt}, Pairs0); #{LinkName := Other} -> - protocol_error(?V_1_0_AMQP_ERROR_PRECONDITION_FAILED, - "received invalid attach ~p for management link pair ~p", - [Attach, Other]); + link_error(?V_1_0_AMQP_ERROR_PRECONDITION_FAILED, + "received invalid attach ~p for management link pair ~p", + [Attach, Other]); _ -> maps:put(LinkName, #management_link_pair{client_terminus_address = ClientTerminusAddress, @@ -1283,9 +1316,9 @@ handle_attach(#'v1_0.attach'{ Pair#management_link_pair{outgoing_half = HandleInt}, Pairs0); #{LinkName := Other} -> - protocol_error(?V_1_0_AMQP_ERROR_PRECONDITION_FAILED, - "received invalid attach ~p for management link pair ~p", - [Attach, Other]); + link_error(?V_1_0_AMQP_ERROR_PRECONDITION_FAILED, + "received invalid attach ~p for management link pair ~p", + [Attach, Other]); _ -> maps:put(LinkName, #management_link_pair{client_terminus_address = ClientTerminusAddress, @@ -1369,9 +1402,9 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER, rabbit_global_counters:publisher_created(?PROTOCOL), reply_frames([Reply, Flow], State); {error, Reason} -> - protocol_error(?V_1_0_AMQP_ERROR_INVALID_FIELD, - "Attach rejected: ~tp", - [Reason]) + link_error(?V_1_0_AMQP_ERROR_INVALID_FIELD, + "Attach refused: ~tp", + [Reason]) end; handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, @@ -1403,7 +1436,7 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, case ensure_source(Source0, LinkNameBin, Vhost, User, ContainerId, ReaderPid, PermCache0, TopicPermCache0) of {error, Reason} -> - protocol_error(?V_1_0_AMQP_ERROR_INVALID_FIELD, "Attach rejected: ~tp", [Reason]); + link_error(?V_1_0_AMQP_ERROR_INVALID_FIELD, "Attach refused: ~tp", [Reason]); {ok, QName = #resource{name = QNameBin}, Source, PermCache1, TopicPermCache} -> PermCache = check_resource_access(QName, read, User, PermCache1), case rabbit_amqqueue:with( @@ -1412,7 +1445,7 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid) catch exit:#amqp_error{name = resource_locked} -> %% An exclusive queue can only be consumed from by its declaring connection. - protocol_error( + link_error( ?V_1_0_AMQP_ERROR_RESOURCE_LOCKED, "cannot obtain exclusive access to locked ~s", [rabbit_misc:rs(QName)]) @@ -1515,21 +1548,26 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, rabbit_global_counters:consumer_created(?PROTOCOL), {ok, [A], State1}; {error, _Type, Reason, Args} -> - protocol_error( - ?V_1_0_AMQP_ERROR_INTERNAL_ERROR, - Reason, Args) + link_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, + Reason, Args) end end) of {ok, Reply, State} -> reply_frames(Reply, State); {error, Reason} -> - protocol_error( - ?V_1_0_AMQP_ERROR_INTERNAL_ERROR, - "Could not operate on ~s: ~tp", - [rabbit_misc:rs(QName), Reason]) + link_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, + "Could not operate on ~ts: ~tp", + [rabbit_misc:rs(QName), Reason]) end end. +-spec link_error(term(), io:format(), [term()]) -> + no_return(). +link_error(Condition, Msg, Args) -> + Description = unicode:characters_to_binary(lists:flatten(io_lib:format(Msg, Args))), + throw({link_error, #'v1_0.error'{condition = Condition, + description = {utf8, Description}}}). + send_pending(#state{remote_incoming_window = RemoteIncomingWindow, outgoing_pending = Buf0 } = State) -> @@ -2493,10 +2531,10 @@ incoming_link_transfer( multi_transfer_msg = undefined}, {ok, Reply, Link, State}; {error, Reason} -> - protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, - "Failed to deliver message to queues, " - "delivery_tag=~p, delivery_id=~p, reason=~p", - [DeliveryTag, DeliveryId, Reason]) + link_error( + ?V_1_0_AMQP_ERROR_INTERNAL_ERROR, + "failed to deliver message (delivery-tag=~p, delivery-id=~p): ~tp", + [DeliveryTag, DeliveryId, Reason]) end; {error, {anonymous_terminus, false}, #'v1_0.error'{} = Err} -> Disposition = case Settled of @@ -2721,7 +2759,7 @@ ensure_source(Source = #'v1_0.source'{address = Address, try cow_uri:urldecode(QNameBinQuoted) of QNameBin -> QName = queue_resource(Vhost, QNameBin), - ok = exit_if_absent(QName), + ok = error_if_absent(QName), {ok, QName, Source, PermCache, TopicPermCache} catch error:_ -> {error, {bad_address, Address}} @@ -2870,7 +2908,8 @@ check_exchange(XNameBin, User, Vhost, PermCache0) -> end, {ok, Exchange, PermCache}; {error, not_found} -> - exit_not_found(XName) + link_error(?V_1_0_AMQP_ERROR_NOT_FOUND, + "no ~ts", [rabbit_misc:rs(XName)]) end. address_v1_permitted() -> @@ -2897,7 +2936,7 @@ ensure_target_v2({utf8, String}, Vhost) -> {ok, _XNameBin, _RKey, undefined} = Ok -> Ok; {ok, _XNameBin, _RKey, QNameBin} = Ok -> - ok = exit_if_absent(queue, Vhost, QNameBin), + ok = error_if_absent(queue, Vhost, QNameBin), Ok; {error, bad_address} -> {error, {bad_address_string, String}} @@ -3325,8 +3364,8 @@ validate_multi_transfer_delivery_id(undefined, _FirstDeliveryId) -> validate_multi_transfer_delivery_id(OtherId, FirstDeliveryId) -> %% "It is an error if the delivery-id on a continuation transfer %% differs from the delivery-id on the first transfer of a delivery." - protocol_error( - ?V_1_0_CONNECTION_ERROR_FRAMING_ERROR, + link_error( + ?V_1_0_AMQP_ERROR_INVALID_FIELD, "delivery-id of continuation transfer (~p) differs from delivery-id on first transfer (~p)", [OtherId, FirstDeliveryId]). @@ -3338,8 +3377,8 @@ validate_multi_transfer_settled(undefined, Settled) ok; validate_multi_transfer_settled(Other, First) when is_boolean(First) -> - protocol_error( - ?V_1_0_CONNECTION_ERROR_FRAMING_ERROR, + link_error( + ?V_1_0_AMQP_ERROR_INVALID_FIELD, "field 'settled' of continuation transfer (~p) differs from " "(interpreted) field 'settled' on first transfer (~p)", [Other, First]). @@ -3355,8 +3394,8 @@ validate_transfer_snd_settle_mode(settled, true) -> %% then this field MUST be true on at least one transfer frame for a delivery" [2.7.5] ok; validate_transfer_snd_settle_mode(SndSettleMode, Settled) -> - protocol_error( - ?V_1_0_CONNECTION_ERROR_FRAMING_ERROR, + link_error( + ?V_1_0_AMQP_ERROR_INVALID_FIELD, "sender settle mode is '~s' but transfer settled flag is interpreted as being '~s'", [SndSettleMode, Settled]). @@ -3397,7 +3436,7 @@ validate_message_size(Msg, MaxMsgSize) -> {undefined | rabbit_misc:resource_name(), permission_cache()}. ensure_terminus(Type, {exchange, {XNameList, _RoutingKey}}, Vhost, User, Durability, PermCache) -> - ok = exit_if_absent(exchange, Vhost, XNameList), + ok = error_if_absent(exchange, Vhost, XNameList), case Type of target -> {undefined, PermCache}; source -> declare_queue_v1(generate_queue_name_v1(), Vhost, User, Durability, PermCache) @@ -3420,22 +3459,25 @@ ensure_terminus(_, {amqqueue, QNameList}, Vhost, _, _, PermCache) -> %% standardised queues. The client MAY declare a queue starting with "amq." %% if the passive option is set, or the queue already exists." QNameBin = unicode:characters_to_binary(QNameList), - ok = exit_if_absent(queue, Vhost, QNameBin), + ok = error_if_absent(queue, Vhost, QNameBin), {QNameBin, PermCache}. -exit_if_absent(Kind, Vhost, Name) when is_list(Name) -> - exit_if_absent(Kind, Vhost, unicode:characters_to_binary(Name)); -exit_if_absent(Kind, Vhost, Name) when is_binary(Name) -> - exit_if_absent(rabbit_misc:r(Vhost, Kind, Name)). +error_if_absent(Kind, Vhost, Name) when is_list(Name) -> + error_if_absent(Kind, Vhost, unicode:characters_to_binary(Name)); +error_if_absent(Kind, Vhost, Name) when is_binary(Name) -> + error_if_absent(rabbit_misc:r(Vhost, Kind, Name)). -exit_if_absent(ResourceName = #resource{kind = Kind}) -> +error_if_absent(Resource = #resource{kind = Kind}) -> Mod = case Kind of exchange -> rabbit_exchange; queue -> rabbit_amqqueue end, - case Mod:exists(ResourceName) of - true -> ok; - false -> exit_not_found(ResourceName) + case Mod:exists(Resource) of + true -> + ok; + false -> + link_error(?V_1_0_AMQP_ERROR_NOT_FOUND, + "no ~ts", [rabbit_misc:rs(Resource)]) end. generate_queue_name_v1() -> @@ -3481,14 +3523,13 @@ declare_queue(QNameBin, {existing, _Q} -> ok; {error, queue_limit_exceeded, Reason, ReasonArgs} -> - protocol_error( - ?V_1_0_AMQP_ERROR_RESOURCE_LIMIT_EXCEEDED, - Reason, - ReasonArgs); + link_error(?V_1_0_AMQP_ERROR_RESOURCE_LIMIT_EXCEEDED, + Reason, + ReasonArgs); Other -> - protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, - "Failed to declare ~s: ~p", - [rabbit_misc:rs(QName), Other]) + link_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, + "Failed to declare ~s: ~p", + [rabbit_misc:rs(QName), Other]) end, {ok, PermCache}. @@ -3684,9 +3725,9 @@ maybe_detach_mgmt_link( check_internal_exchange(#exchange{internal = true, name = XName}) -> - protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, - "forbidden to publish to internal ~ts", - [rabbit_misc:rs(XName)]); + link_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + "forbidden to publish to internal ~ts", + [rabbit_misc:rs(XName)]); check_internal_exchange(_) -> ok. @@ -3794,7 +3835,7 @@ check_user_id(Mc, User) -> ok -> ok; {refused, Reason, Args} -> - protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, Reason, Args) + link_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, Reason, Args) end. maps_update_with(Key, Fun, Init, Map) -> @@ -3822,15 +3863,15 @@ check_paired({map, Properties}) -> true -> ok; false -> - exit_property_paired_not_set() + error_property_paired_not_set() end; check_paired(_) -> - exit_property_paired_not_set(). + error_property_paired_not_set(). --spec exit_property_paired_not_set() -> no_return(). -exit_property_paired_not_set() -> - protocol_error(?V_1_0_AMQP_ERROR_INVALID_FIELD, - "Link property 'paired' is not set to boolean value 'true'", []). +-spec error_property_paired_not_set() -> no_return(). +error_property_paired_not_set() -> + link_error(?V_1_0_AMQP_ERROR_INVALID_FIELD, + "Link property 'paired' is not set to boolean value 'true'", []). -spec exit_not_implemented(io:format()) -> no_return(). exit_not_implemented(Format) -> @@ -3838,13 +3879,7 @@ exit_not_implemented(Format) -> -spec exit_not_implemented(io:format(), [term()]) -> no_return(). exit_not_implemented(Format, Args) -> - protocol_error(?V_1_0_AMQP_ERROR_NOT_IMPLEMENTED, Format, Args). - --spec exit_not_found(rabbit_types:r(exchange | queue)) -> no_return(). -exit_not_found(Resource) -> - protocol_error(?V_1_0_AMQP_ERROR_NOT_FOUND, - "no ~ts", - [rabbit_misc:rs(Resource)]). + link_error(?V_1_0_AMQP_ERROR_NOT_IMPLEMENTED, Format, Args). -spec error_not_found(rabbit_types:r(exchange | queue)) -> #'v1_0.error'{}. error_not_found(Resource) -> diff --git a/deps/rabbit/test/amqp_address_SUITE.erl b/deps/rabbit/test/amqp_address_SUITE.erl index a974675bb17a..50adfa8a9344 100644 --- a/deps/rabbit/test/amqp_address_SUITE.erl +++ b/deps/rabbit/test/amqp_address_SUITE.erl @@ -21,9 +21,11 @@ -import(amqp_utils, [connection_config/1, flush/1, - wait_for_credit/1]). + wait_for_credit/1, + end_session_sync/1 + ]). --define(TIMEOUT, 30_000). +-define(TIMEOUT, 9000). all() -> [ @@ -206,24 +208,30 @@ target_exchange_absent(Config) -> XName = <<"🎈"/utf8>>, TargetAddr = rabbitmq_amqp_address:exchange(XName), - OpnConf = connection_config(Config), + OpnConf0 = connection_config(Config), + OpnConf = OpnConf0#{notify_with_performative => true}, {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Session} = amqp10_client:begin_session_sync(Connection), - {ok, _Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, TargetAddr), - receive - {amqp10_event, - {session, Session, - {ended, - #'v1_0.error'{ - condition = ?V_1_0_AMQP_ERROR_NOT_FOUND, - description = {utf8, <<"no exchange '", XName:(byte_size(XName))/binary, - "' in vhost '/'">>}}}}} -> ok - after ?TIMEOUT -> - Reason = {missing_event, ?LINE}, - flush(Reason), - ct:fail(Reason) + %% RabbitMQ should refuse the link. + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, TargetAddr), + receive {amqp10_event, {link, Sender, {attached, #'v1_0.attach'{target = Target}}}} -> + ?assertNotMatch(#'v1_0.target'{}, Target) + after 9000 -> ct:fail({missing_event, ?LINE}) end, + receive {amqp10_event, {link, Sender, {detached, #'v1_0.detach'{closed = Closed, + error = Error}}}} -> + ?assert(Closed), + ?assertEqual( + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_NOT_FOUND, + description = {utf8, <<"no exchange '", XName:(byte_size(XName))/binary, + "' in vhost '/'">>}}, + Error) + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection). %% Test v2 target and source address @@ -269,11 +277,11 @@ target_queue_absent(Config) -> {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Session} = amqp10_client:begin_session_sync(Connection), - {ok, _Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, TargetAddr), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, TargetAddr), receive {amqp10_event, - {session, Session, - {ended, + {link, Sender, + {detached, #'v1_0.error'{ condition = ?V_1_0_AMQP_ERROR_NOT_FOUND, description = {utf8, <<"no queue '", QName:(byte_size(QName))/binary, @@ -283,6 +291,7 @@ target_queue_absent(Config) -> flush(Reason), ct:fail(Reason) end, + ok = amqp10_client:end_session(Session), ok = amqp10_client:close_connection(Connection). %% Test v2 target address 'null' and 'to' @@ -563,17 +572,18 @@ target_bad_address0(TargetAddress, Config) -> {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Session} = amqp10_client:begin_session_sync(Connection), - {ok, _Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, TargetAddress), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, TargetAddress), receive {amqp10_event, - {session, Session, - {ended, + {link, Sender, + {detached, #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD}}}} -> ok after ?TIMEOUT -> Reason = {missing_event, ?LINE, TargetAddress}, flush(Reason), ct:fail(Reason) end, + ok = amqp10_client:end_session(Session), ok = amqp10_client:close_connection(Connection). %% Test v2 source address @@ -587,11 +597,11 @@ source_queue_absent(Config) -> {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Session} = amqp10_client:begin_session_sync(Connection), - {ok, _Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, SourceAddr), + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, SourceAddr), receive {amqp10_event, - {session, Session, - {ended, + {link, Receiver, + {detached, #'v1_0.error'{ condition = ?V_1_0_AMQP_ERROR_NOT_FOUND, description = {utf8, <<"no queue '", QName:(byte_size(QName))/binary, @@ -601,6 +611,7 @@ source_queue_absent(Config) -> flush(Reason), ct:fail(Reason) end, + ok = amqp10_client:end_session(Session), ok = amqp10_client:close_connection(Connection). source_bad_address(Config) -> @@ -623,17 +634,18 @@ source_bad_address0(SourceAddress, Config) -> {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Session} = amqp10_client:begin_session_sync(Connection), - {ok, _Receiver} = amqp10_client:attach_receiver_link(Session, <<"sender">>, SourceAddress), + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"sender">>, SourceAddress), receive {amqp10_event, - {session, Session, - {ended, + {link, Receiver, + {detached, #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD}}}} -> ok after ?TIMEOUT -> Reason = {missing_event, ?LINE}, flush(Reason), ct:fail(Reason) end, + ok = amqp10_client:end_session(Session), ok = amqp10_client:close_connection(Connection). init(Config) -> diff --git a/deps/rabbit/test/amqp_auth_SUITE.erl b/deps/rabbit/test/amqp_auth_SUITE.erl index 389a37b2d5c7..49f5b0c40d2c 100644 --- a/deps/rabbit/test/amqp_auth_SUITE.erl +++ b/deps/rabbit/test/amqp_auth_SUITE.erl @@ -27,6 +27,7 @@ [web_amqp/1, flush/1, wait_for_credit/1, + end_session_sync/1, close_connection_sync/1]). all() -> @@ -411,16 +412,17 @@ v1_attach_target_internal_exchange(Config) -> {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Session} = amqp10_client:begin_session_sync(Connection), Address = <<"/exchange/", XName/binary, "/some-routing-key">>, - {ok, _} = amqp10_client:attach_sender_link( - Session, <<"test-sender">>, Address), + {ok, Sender} = amqp10_client:attach_sender_link( + Session, <<"test-sender">>, Address), ExpectedErr = error_unauthorized( <<"forbidden to publish to internal exchange 'test exchange' in vhost '/'">>), - receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok - after ?TIMEOUT -> flush(missing_ended), - ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") + receive {amqp10_event, {link, Sender, {detached, ExpectedErr}}} -> ok + after ?TIMEOUT -> flush(missing_event), + ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") end, - ok = amqp10_client:close_connection(Connection), + ok = end_session_sync(Session), + ok = close_connection_sync(Connection), #'exchange.delete_ok'{} = amqp_channel:call(Ch, #'exchange.delete'{exchange = XName}), ok = rabbit_ct_client_helpers:close_channel(Ch). @@ -622,24 +624,23 @@ target_per_message_internal_exchange(Config) -> To = rabbitmq_amqp_address:exchange(XName), ok = set_permissions(Config, XName, XName, <<>>), - {Conn1, Session1, LinkPair1} = init_pair(Config), - ok = rabbitmq_amqp_client:declare_exchange(LinkPair1, XName, XProps), - {ok, Sender} = amqp10_client:attach_sender_link_sync(Session1, <<"sender">>, TargetAddress), + {_, Session, LinkPair} = Init = init_pair(Config), + ok = rabbitmq_amqp_client:declare_exchange(LinkPair, XName, XProps), + {ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, TargetAddress), ok = wait_for_credit(Sender), Tag = <<"tag">>, Msg = amqp10_msg:set_properties(#{to => To}, amqp10_msg:new(Tag, <<"msg">>, true)), ok = amqp10_client:send_msg(Sender, Msg), ExpectedErr = error_unauthorized( - <<"forbidden to publish to internal exchange '", XName/binary, "' in vhost 'test vhost'">>), - receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok + <<"forbidden to publish to internal exchange '", + XName/binary, "' in vhost 'test vhost'">>), + receive {amqp10_event, {link, Sender, {detached, ExpectedErr}}} -> ok after ?TIMEOUT -> flush(missing_event), - ct:fail({missing_event, ?LINE}) + ct:fail({missing_event, ?LINE}) end, - ok = close_connection_sync(Conn1), - Init = {_, _, LinkPair2} = init_pair(Config), - ok = rabbitmq_amqp_client:delete_exchange(LinkPair2, XName), + ok = rabbitmq_amqp_client:delete_exchange(LinkPair, XName), ok = cleanup_pair(Init). target_per_message_topic(Config) -> @@ -822,14 +823,14 @@ v1_vhost_queue_limit(Config) -> {ok, C1} = amqp10_client:open_connection(OpnConf1), {ok, Session1} = amqp10_client:begin_session_sync(C1), TargetAddress = <<"/queue/", QName/binary>>, - {ok, _Sender1} = amqp10_client:attach_sender_link( - Session1, <<"test-sender-1">>, TargetAddress), + {ok, Sender1} = amqp10_client:attach_sender_link( + Session1, <<"test-sender-1">>, TargetAddress), ExpectedErr = amqp_error( ?V_1_0_AMQP_ERROR_RESOURCE_LIMIT_EXCEEDED, <<"cannot declare queue 'q1': queue limit in vhost 'test vhost' (0) is reached">>), - receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok - after ?TIMEOUT -> flush(missing_ended), - ct:fail("did not receive expected error") + receive {amqp10_event, {link, Sender1, {detached, ExpectedErr}}} -> ok + after ?TIMEOUT -> flush(missing_event), + ct:fail("did not receive expected error") end, OpnConf2 = connection_config(Config, <<"/">>), diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 82e58829422c..c7913b4415fd 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -65,7 +65,7 @@ groups() -> quorum_queue_rejects, receiver_settle_mode_first, publishing_to_non_existing_queue_should_settle_with_released, - open_link_to_non_existing_destination_should_end_session, + attach_link_to_non_existing_destination, roundtrip_with_drain_classic_queue, roundtrip_with_drain_quorum_queue, roundtrip_with_drain_stream, @@ -994,46 +994,46 @@ durable_field(Config, QueueType, QName) invalid_transfer_settled_flag(Config) -> OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), - {ok, Session1} = amqp10_client:begin_session(Connection), - {ok, Session2} = amqp10_client:begin_session(Connection), + {ok, Session} = amqp10_client:begin_session(Connection), TargetAddr = rabbitmq_amqp_address:exchange(<<"amq.fanout">>), {ok, SenderSettled} = amqp10_client:attach_sender_link_sync( - Session1, <<"link 1">>, TargetAddr, settled), + Session, <<"link 1">>, TargetAddr, settled), {ok, SenderUnsettled} = amqp10_client:attach_sender_link_sync( - Session2, <<"link 2">>, TargetAddr, unsettled), + Session, <<"link 2">>, TargetAddr, unsettled), ok = wait_for_credit(SenderSettled), ok = wait_for_credit(SenderUnsettled), ok = amqp10_client:send_msg(SenderSettled, amqp10_msg:new(<<"tag1">>, <<"m1">>, false)), receive {amqp10_event, - {session, Session1, - {ended, + {link, SenderSettled, + {detached, #'v1_0.error'{ - condition = ?V_1_0_CONNECTION_ERROR_FRAMING_ERROR, + condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD, description = {utf8, Description1}}}}} -> ?assertEqual( <<"sender settle mode is 'settled' but transfer settled flag is interpreted as being 'false'">>, Description1) - after 30000 -> flush(missing_ended), + after 9000 -> flush(missing_event), ct:fail({missing_event, ?LINE}) end, ok = amqp10_client:send_msg(SenderUnsettled, amqp10_msg:new(<<"tag2">>, <<"m2">>, true)), receive {amqp10_event, - {session, Session2, - {ended, + {link, SenderUnsettled, + {detached, #'v1_0.error'{ - condition = ?V_1_0_CONNECTION_ERROR_FRAMING_ERROR, + condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD, description = {utf8, Description2}}}}} -> ?assertEqual( <<"sender settle mode is 'unsettled' but transfer settled flag is interpreted as being 'true'">>, Description2) - after 30000 -> flush(missing_ended), + after 9000 -> flush(missing_event), ct:fail({missing_event, ?LINE}) end, + ok = end_session_sync(Session), ok = close_connection_sync(Connection). quorum_queue_rejects(Config) -> @@ -1188,23 +1188,27 @@ publishing_to_non_existing_queue_should_settle_with_released(Config) -> ok = close_connection_sync(Connection), ok = flush("post sender close"). -open_link_to_non_existing_destination_should_end_session(Config) -> +attach_link_to_non_existing_destination(Config) -> OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), Name = atom_to_binary(?FUNCTION_NAME), Addresses = [rabbitmq_amqp_address:exchange(Name, <<"bar">>), rabbitmq_amqp_address:queue(Name)], - SenderLinkName = <<"test-sender">>, [begin - {ok, Connection} = amqp10_client:open_connection(OpnConf), - {ok, Session} = amqp10_client:begin_session_sync(Connection), - ct:pal("Address ~s", [Address]), - {ok, _} = amqp10_client:attach_sender_link( - Session, SenderLinkName, Address), - wait_for_session_end(Session), - ok = close_connection_sync(Connection), - flush("post sender close") + ct:pal("Address ~ts", [Address]), + {ok, Sender} = amqp10_client:attach_sender_link( + Session, Name, Address), + receive {amqp10_event, {link, Sender, {detached, Error}}} -> + ?assertMatch( + #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_NOT_FOUND}, + Error) + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + flush("sender detached") end || Address <- Addresses], - ok. + ok = end_session_sync(Session), + ok = close_connection_sync(Connection). roundtrip_with_drain_classic_queue(Config) -> QName = atom_to_binary(?FUNCTION_NAME), @@ -4660,15 +4664,16 @@ user_id(Config) -> ok = amqp10_client:send_msg(Sender, Msg2), receive {amqp10_event, - {session, Session, - {ended, + {link, Sender, + {detached, #'v1_0.error'{ condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, description = {utf8, <<"user_id property set to 'fake user' but authenticated user was 'guest'">>}}}}} -> ok - after 30000 -> flush(missing_ended), + after 9000 -> flush(missing_detached), ct:fail("did not receive expected error") end, + ok = end_session_sync(Session), ok = close_connection_sync(Connection). message_ttl(Config) -> @@ -4863,7 +4868,7 @@ credential_expires(Config) -> ?assert(rpc(Config, meck, validate, [Mod])), ok = rpc(Config, meck, unload, [Mod]). -%% Attaching to an exclusive source queue should fail. +%% Attaching to an exclusive source queue on a non-declaring connection should fail. attach_to_exclusive_queue(Config) -> QName = <<"my queue">>, {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), @@ -4875,18 +4880,19 @@ attach_to_exclusive_queue(Config) -> {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Session} = amqp10_client:begin_session_sync(Connection), Address = rabbitmq_amqp_address:queue(QName), - {ok, _Receiver} = amqp10_client:attach_receiver_link(Session, <<"test-receiver">>, Address), + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"test-receiver">>, Address), receive {amqp10_event, - {session, Session, - {ended, + {link, Receiver, + {detached, #'v1_0.error'{ condition = ?V_1_0_AMQP_ERROR_RESOURCE_LOCKED, description = {utf8, <<"cannot obtain exclusive access to locked " "queue 'my queue' in vhost '/'">>}}}}} -> ok - after 30000 -> ct:fail({missing_event, ?LINE}) + after 9000 -> ct:fail({missing_event, ?LINE}) end, + ok = end_session_sync(Session), ok = close_connection_sync(Connection), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). @@ -6772,8 +6778,7 @@ attach_to_down_quorum_queue(Config) -> %% Create quorum queue with single replica on node 2. {_, _, LinkPair2} = Init2 = init(2, Config), {ok, _} = rabbitmq_amqp_client:declare_queue( - LinkPair2, - QName, + LinkPair2, QName, #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}, <<"x-quorum-initial-group-size">> => {ulong, 1} }}), @@ -6782,16 +6787,14 @@ attach_to_down_quorum_queue(Config) -> %% Make quorum queue unavailable. ok = rabbit_ct_broker_helpers:stop_broker(Config, 2), - OpnConf = connection_config(0, Config), - {ok, Connection} = amqp10_client:open_connection(OpnConf), - {ok, Session0} = amqp10_client:begin_session_sync(Connection), + {_, Session, LinkPair0} = Init0 = init(0, Config), flush(attaching_receiver), - {ok, _Receiver} = amqp10_client:attach_receiver_link( - Session0, <<"receiver">>, Address), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"receiver">>, Address), receive {amqp10_event, - {session, Session0, - {ended, + {link, Receiver, + {detached, #'v1_0.error'{ condition = ?V_1_0_AMQP_ERROR_INTERNAL_ERROR, description = {utf8, Desc}}}}} -> @@ -6803,12 +6806,8 @@ attach_to_down_quorum_queue(Config) -> end, ok = rabbit_ct_broker_helpers:start_broker(Config, 2), - - {ok, Session} = amqp10_client:begin_session_sync(Connection), - {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync( - Session, <<"my link pair">>), - {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), - ok = close({Connection, Session, LinkPair}). + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair0, QName), + ok = close(Init0). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% internal diff --git a/deps/rabbit/test/amqp_dotnet_SUITE.erl b/deps/rabbit/test/amqp_dotnet_SUITE.erl index af55bb773e68..32ab6039269e 100644 --- a/deps/rabbit/test/amqp_dotnet_SUITE.erl +++ b/deps/rabbit/test/amqp_dotnet_SUITE.erl @@ -35,7 +35,10 @@ groups() -> redelivery, released, routing, - invalid_routes, + attach_sender_to_missing_exchange, + attach_sender_to_invalid_address, + attach_receiver_from_missing_queue, + attach_receiver_from_invalid_address, auth_failure, access_failure_not_allowed, access_failure_send, @@ -176,7 +179,16 @@ routing(Config) -> }), run(?FUNCTION_NAME, Config). -invalid_routes(Config) -> +attach_sender_to_missing_exchange(Config) -> + run(?FUNCTION_NAME, Config). + +attach_sender_to_invalid_address(Config) -> + run(?FUNCTION_NAME, Config). + +attach_receiver_from_missing_queue(Config) -> + run(?FUNCTION_NAME, Config). + +attach_receiver_from_invalid_address(Config) -> run(?FUNCTION_NAME, Config). auth_failure(Config) -> diff --git a/deps/rabbit/test/amqp_dotnet_SUITE_data/fsharp-tests/Program.fs b/deps/rabbit/test/amqp_dotnet_SUITE_data/fsharp-tests/Program.fs index 3aa07b5274ee..f71431e97e41 100755 --- a/deps/rabbit/test/amqp_dotnet_SUITE_data/fsharp-tests/Program.fs +++ b/deps/rabbit/test/amqp_dotnet_SUITE_data/fsharp-tests/Program.fs @@ -118,24 +118,25 @@ module Test = Guid("f275ea5e-0c57-4ad7-b11a-b20c563d3b71") :> obj ] - let testOutcome uri (attach: Attach) (cond: string) = + let testOutcome (uri:string) (attach:Attach) (expectedCond:string) = use ac = connectAnon uri - let trySet (mre: AutoResetEvent) = - try mre.Set() |> ignore with _ -> () - - use mre = new System.Threading.AutoResetEvent(false) - let mutable errorName = null - ac.Session.add_Closed ( - new ClosedCallback (fun o err -> errorName <- string err.Condition; trySet mre)) - - let attached = new OnAttached ( - fun l attach -> errorName <- null; trySet mre) - - let receiver = ReceiverLink(ac.Session, "test-receiver", attach, attached) - mre.WaitOne(1000) |> ignore - if cond = null then + let mutable errorName : string = null + use attachedEv = new AutoResetEvent(false) + use closedEv = new AutoResetEvent(false) + let onAttached = new OnAttached (fun _ _ -> attachedEv.Set() |> ignore) + let receiver = new ReceiverLink(ac.Session, "test-receiver", attach, onAttached) + receiver.add_Closed( + new ClosedCallback (fun _ err -> + if not (isNull err) then errorName <- string err.Condition + closedEv.Set() |> ignore)) + + if not (attachedEv.WaitOne(9000)) then failwith "Expected broker to reply with attach frame" + if isNull expectedCond then + if closedEv.WaitOne(5) then failwith "Unexpected link refusal" receiver.Close() - assertEqual cond errorName + else + if not (closedEv.WaitOne(9000)) then failwith "Expected link refusal, but link didn't close." + assertEqual expectedCond errorName let no_routes_is_released uri = // tests that a message sent to an exchange that resolves no routes for the @@ -416,33 +417,121 @@ module Test = assertTrue (m' <> null) assertEqual (m.Body :?> int) (m'.Body :?> int) - let invalidRoutes uri = + // Test attaching a sender to a non-existent exchange + // Expect: Link refused with amqp:not-found error + let attachSenderToMissingExchange uri = + use ac = connectAnon uri + use attachEvent = new System.Threading.AutoResetEvent(false) + use detachEvent = new System.Threading.AutoResetEvent(false) + let mutable linkError : Error = null + + let attached = new OnAttached (fun link attach -> + if attach.Target = null then + attachEvent.Set() |> ignore + else + failwith "Expected null target in attach response" + ) + + let sender = new SenderLink(ac.Session, "test-sender", + Target(Address = "/exchanges/missing"), attached) + + sender.add_Closed(new ClosedCallback(fun _ err -> + linkError <- err + detachEvent.Set() |> ignore)) + + assertTrue (attachEvent.WaitOne(9000)) + assertTrue (detachEvent.WaitOne(9000)) + + assertNotNull linkError + assertEqual (Symbol "amqp:not-found") linkError.Condition + assertTrue (not ac.Session.IsClosed) + + // Test attaching a sender to an invalid address format + // Expect: Link refused with amqp:invalid-field error + let attachSenderToInvalidAddress uri = + use ac = connectAnon uri + use attachEvent = new System.Threading.AutoResetEvent(false) + use detachEvent = new System.Threading.AutoResetEvent(false) + let mutable linkError : Error = null + + let attached = new OnAttached (fun link attach -> + if attach.Target = null then + attachEvent.Set() |> ignore + else + failwith "Expected null target in attach response" + ) + + let sender = new SenderLink(ac.Session, "test-sender", + Target(Address = "/fruit/orange"), attached) + + sender.add_Closed(new ClosedCallback(fun _ err -> + linkError <- err + detachEvent.Set() |> ignore)) + + assertTrue (attachEvent.WaitOne(9000)) + assertTrue (detachEvent.WaitOne(9000)) + + assertNotNull linkError + assertEqual (Symbol "amqp:invalid-field") linkError.Condition + assertTrue (not ac.Session.IsClosed) + + // Test attaching a receiver to a non-existent queue + // Expect: Link refused with amqp:not-found error + let attachReceiverFromMissingQueue uri = + use ac = connectAnon uri + use attachEvent = new System.Threading.AutoResetEvent(false) + use detachEvent = new System.Threading.AutoResetEvent(false) + let mutable linkError : Error = null + + let attached = new OnAttached (fun link attach -> + if attach.Source = null then + attachEvent.Set() |> ignore + else + failwith "Expected null source in attach response" + ) + + let receiver = new ReceiverLink(ac.Session, "test-receiver", + Source(Address = "/queues/missing"), attached) + + receiver.add_Closed(new ClosedCallback(fun _ err -> + linkError <- err + detachEvent.Set() |> ignore)) + + assertTrue (attachEvent.WaitOne(9000)) + assertTrue (detachEvent.WaitOne(9000)) + + assertNotNull linkError + assertEqual (Symbol "amqp:not-found") linkError.Condition + assertTrue (not ac.Session.IsClosed) + + // Test attaching a receiver from an invalid address format + // Expect: Link refused with amqp:invalid-field error + let attachReceiverFromInvalidAddress uri = + use ac = connectAnon uri + use attachEvent = new System.Threading.AutoResetEvent(false) + use detachEvent = new System.Threading.AutoResetEvent(false) + let mutable linkError : Error = null - for addr, cond in - ["/exchanges/missing", "amqp:not-found" - "/fruit/orange", "amqp:invalid-field"] do - use ac = connectAnon uri - let trySet (mre: AutoResetEvent) = - try mre.Set() |> ignore with _ -> () + let attached = new OnAttached (fun link attach -> + if attach.Source = null then + attachEvent.Set() |> ignore + else + failwith "Expected null source in attach response" + ) - let mutable errorName = null - use mre = new System.Threading.AutoResetEvent(false) - ac.Session.add_Closed ( - new ClosedCallback (fun _ err -> errorName <- err.Condition; trySet mre)) + let receiver = new ReceiverLink(ac.Session, "test-receiver", + Source(Address = "/fruit/orange"), attached) - let attached = new OnAttached (fun _ _ -> trySet mre) + receiver.add_Closed(new ClosedCallback(fun _ err -> + linkError <- err + detachEvent.Set() |> ignore)) - let sender = new SenderLink(ac.Session, "test-sender", - Target(Address = addr), attached); - mre.WaitOne() |> ignore + assertTrue (attachEvent.WaitOne(9000)) + assertTrue (detachEvent.WaitOne(9000)) - try - let receiver = ReceiverLink(ac.Session, "test-receiver", addr) - receiver.Close() - with - | :? Amqp.AmqpException as ae -> - assertEqual (Symbol cond) (ae.Error.Condition) - | _ -> failwith "invalid expection thrown" + assertNotNull linkError + assertEqual (Symbol "amqp:invalid-field") linkError.Condition + assertTrue (not ac.Session.IsClosed) let authFailure uri = try @@ -538,8 +627,17 @@ let main argv = | [AsLower "routing"; uri] -> routing uri 0 - | [AsLower "invalid_routes"; uri] -> - invalidRoutes uri + | [AsLower "attach_sender_to_missing_exchange"; uri] -> + attachSenderToMissingExchange uri + 0 + | [AsLower "attach_sender_to_invalid_address"; uri] -> + attachSenderToInvalidAddress uri + 0 + | [AsLower "attach_receiver_from_missing_queue"; uri] -> + attachReceiverFromMissingQueue uri + 0 + | [AsLower "attach_receiver_from_invalid_address"; uri] -> + attachReceiverFromInvalidAddress uri 0 | [AsLower "streams"; uri] -> streams uri