From 72c42432a506322b3a4942d1a54d010eaf40a3cc Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Mon, 19 Jul 2021 09:19:40 +0200 Subject: [PATCH 1/4] Add websocket support with Gun Expand cowboy routing so /ws[...] are redirected to a Gun Websocket handler. To facilitate this a ws macro is added to matcher that sets up 'route' by generating an ID. This ID is a query parameter appended to the redirect URI and mapped to backend host, port and path. --- config/config.exs | 6 +- lib/dispatcher.ex | 44 ++++--- lib/dispatcher/log.ex | 4 + lib/manipulators/add_x_rewrite_url_header.ex | 6 +- .../remove_accept_encoding_header.ex | 6 +- lib/matcher.ex | 74 +++++++++-- lib/mu_dispatcher.ex | 14 +- lib/plug_router_dispatcher.ex | 4 +- lib/proxy.ex | 8 +- lib/websocket_handler.ex | 122 ++++++++++++++++++ mix.exs | 13 +- mix.lock | 29 +++-- 12 files changed, 269 insertions(+), 61 deletions(-) create mode 100644 lib/websocket_handler.ex diff --git a/config/config.exs b/config/config.exs index 576f57f..ebaa566 100644 --- a/config/config.exs +++ b/config/config.exs @@ -44,7 +44,11 @@ config :dispatcher, # log whenever a layer starts processing log_layer_start_processing: CH.system_boolean("LOG_LAYER_START_PROCESSING"), # log whenever a layer matched, and if no matching layer was found - log_layer_matching: CH.system_boolean("LOG_LAYER_MATCHING") + log_layer_matching: CH.system_boolean("LOG_LAYER_MATCHING"), + log_ws_all: CH.system_boolean("LOG_WS_ALL"), + log_ws_backend: CH.system_boolean("LOG_WS_BACKEND"), + log_ws_frontend: CH.system_boolean("LOG_WS_FRONTEND"), + log_ws_unhandled: CH.system_boolean("LOG_WS_UNHANDLED") # It is also possible to import configuration files, relative to this # directory. For example, you can emulate configuration per environment diff --git a/lib/dispatcher.ex b/lib/dispatcher.ex index d90bb93..be884b4 100644 --- a/lib/dispatcher.ex +++ b/lib/dispatcher.ex @@ -1,11 +1,11 @@ defmodule Dispatcher do use Matcher - define_accept_types [ - text: [ "text/*" ], - html: [ "text/html", "application/xhtml+html" ], - json: [ "application/json", "application/vnd.api+json" ] - ] + define_accept_types( + text: ["text/*"], + html: ["text/html", "application/xhtml+html"], + json: ["application/json", "application/vnd.api+json"] + ) # get "/*_rest", %{ accept: %{ html: true } } do # Proxy.forward conn, [], "http://static/ember-app/index.html" @@ -16,35 +16,39 @@ defmodule Dispatcher do # end post "/hello/erika", %{} do - Plug.Conn.send_resp conn, 401, "FORBIDDEN" + Plug.Conn.send_resp(conn, 401, "FORBIDDEN") end # 200 microservice dispatching - match "/hello/erika", %{ accept: %{ json: true } } do - Plug.Conn.send_resp conn, 200, "{ \"message\": \"Hello Erika\" }" + match "/hello/erika", %{accept: %{json: true}} do + Plug.Conn.send_resp(conn, 200, "{ \"message\": \"Hello Erika\" }\n") end - match "/hello/erika", %{ accept: %{ html: true } } do - Plug.Conn.send_resp conn, 200, "HelloHello Erika" + match "/hello/erika", %{accept: %{html: true}} do + Plug.Conn.send_resp( + conn, + 200, + "HelloHello Erika" + ) end # 404 routes - match "/hello/aad/*_rest", %{ accept: %{ json: true } } do - Plug.Conn.send_resp conn, 200, "{ \"message\": \"Hello Aad\" }" + match "/hello/aad/*_rest", %{accept: %{json: true}} do + Plug.Conn.send_resp(conn, 200, "{ \"message\": \"Hello Aad\" }") end - match "/*_rest", %{ accept: %{ json: true }, last_call: true } do - Plug.Conn.send_resp conn, 404, "{ \"errors\": [ \"message\": \"Not found\", \"status\": 404 } ] }" - end + # Websocket example route + # This forwards to /ws?target=<...> + # Then forwards websocket from /ws?target=<...> to ws://localhost:7999 - match "/*_rest", %{ accept: %{ html: true }, last_call: true } do - Plug.Conn.send_resp conn, 404, "Not foundNo acceptable response found" + match "/ws2" do + ws(conn, "ws://localhost:7999") end - match "/*_rest", %{ last_call: true } do - Plug.Conn.send_resp conn, 404, "No response found" - end + match "__", %{last_call: true} do + send_resp(conn, 404, "Route not found. See config/dispatcher.ex") + end end diff --git a/lib/dispatcher/log.ex b/lib/dispatcher/log.ex index b0a785b..1535234 100644 --- a/lib/dispatcher/log.ex +++ b/lib/dispatcher/log.ex @@ -2,6 +2,10 @@ defmodule Dispatcher.Log do @type log_name :: :log_layer_start_processing | :log_layer_matching + | :log_ws_all + | :log_ws_backend + | :log_ws_frontend + | :log_ws_unhandled @spec log(log_name, any()) :: any() def log(name, content) do diff --git a/lib/manipulators/add_x_rewrite_url_header.ex b/lib/manipulators/add_x_rewrite_url_header.ex index 589083d..5e6f59d 100644 --- a/lib/manipulators/add_x_rewrite_url_header.ex +++ b/lib/manipulators/add_x_rewrite_url_header.ex @@ -2,14 +2,14 @@ defmodule Manipulators.AddXRewriteUrlHeader do @behaviour ProxyManipulator @impl true - def headers( headers, {frontend_conn, _backend_conn} = connection ) do + def headers(headers, {frontend_conn, _backend_conn} = connection) do new_headers = [{"x-rewrite-url", frontend_conn.request_path} | headers] {new_headers, connection} end @impl true - def chunk(_,_), do: :skip + def chunk(_, _), do: :skip @impl true - def finish(_,_), do: :skip + def finish(_, _), do: :skip end diff --git a/lib/manipulators/remove_accept_encoding_header.ex b/lib/manipulators/remove_accept_encoding_header.ex index f55ad70..4e50ea2 100644 --- a/lib/manipulators/remove_accept_encoding_header.ex +++ b/lib/manipulators/remove_accept_encoding_header.ex @@ -3,9 +3,9 @@ defmodule Manipulators.RemoveAcceptEncodingHeader do @impl true def headers(headers, connection) do - headers = - headers - |> Enum.reject( &match?( {"accept_encoding", _}, &1 ) ) + # headers = + # headers + # |> Enum.reject( &match?( {"accept_encoding", _}, &1 ) ) {headers, connection} end diff --git a/lib/matcher.ex b/lib/matcher.ex index d4379ad..e030ff3 100644 --- a/lib/matcher.ex +++ b/lib/matcher.ex @@ -2,15 +2,20 @@ alias Dispatcher.Log defmodule Matcher do defmacro __using__(_opts) do + # Set this attribute _BEFORE_ any code is ran + Module.register_attribute(__CALLER__.module, :websocket, accumulate: true) + quote do require Matcher import Matcher + import Plug.Router, only: [forward: 2] import Plug.Conn, only: [send_resp: 3] import Proxy, only: [forward: 3] def layers do - [ :service, :last_call ] + [:service, :last_call] end + defoverridable layers: 0 def dispatch(conn) do @@ -28,6 +33,34 @@ defmodule Matcher do end end + defmacro ws(conn, host) do + # host = "ws://localhost:8000/test" + + parsed = + URI.parse(host) + |> Log.inspect(:log_ws_all, label: "Creating websocket route") + + id = for _ <- 1..24, into: "", do: <> + + host = parsed.host || "localhost" + port = parsed.port || 80 + path = parsed.path || "/" + + Module.put_attribute(__CALLER__.module, :websocket, %{ + host: host, + port: port, + path: path, + id: id + }) + + # Return redirect things + quote do + unquote(conn) + |> Plug.Conn.resp(:found, "") + |> Plug.Conn.put_resp_header("location", "/ws?target=" <> unquote(id)) + end + end + defmacro get(path, options \\ quote(do: %{}), do: block) do quote do match_method(get, unquote(path), unquote(options), do: unquote(block)) @@ -98,7 +131,6 @@ defmodule Matcher do defmacro __before_compile__(_env) do matchers = Module.get_attribute(__CALLER__.module, :matchers) - # |> IO.inspect(label: "Discovered matchers") |> Enum.map(fn {call, path, options, block} -> make_match_method(call, path, options, block, __CALLER__) end) @@ -110,7 +142,18 @@ defmodule Matcher do end end - [last_match_def | matchers] + socket_dict_f = + quote do + def websockets() do + Enum.reduce(@websocket, %{}, fn x, acc -> Map.put(acc, x.id, x) end) + end + + def get_websocket(id) do + Enum.find(@websocket, fn x -> x.id == id end) + end + end + + [socket_dict_f, last_match_def | matchers] |> Enum.reverse() end @@ -171,24 +214,29 @@ defmodule Matcher do new_accept = case value do - [item] -> # convert item + # convert item + [item] -> {:%{}, [], [{item, true}]} + [_item | _rest] -> raise "Multiple items in accept arrays are not supported." + {:%{}, _, _} -> value end new_list = list - |> Keyword.drop( [:accept] ) - |> Keyword.merge( [accept: new_accept] ) + |> Keyword.drop([:accept]) + |> Keyword.merge(accept: new_accept) {:%{}, any, new_list} else options end - _ -> options + + _ -> + options end end @@ -223,8 +271,6 @@ defmodule Matcher do str -> str end).() - # |> IO.inspect(label: "call name") - # Creates the variable(s) for the parsed path process_derived_path_elements = fn elements -> reversed_elements = Enum.reverse(elements) @@ -310,7 +356,6 @@ defmodule Matcher do def dispatch_call(conn, accept_types, layers_fn, call_handler) do # Extract core info {method, path, accept_header, host} = extract_core_info_from_conn(conn) - # |> IO.inspect(label: "extracted header") # Extract core request info accept_hashes = @@ -321,10 +366,11 @@ defmodule Matcher do # layers |> IO.inspect(label: "layers" ) # Try to find a solution in each of the layers - layers = layers_fn.() - |> Log.inspect(:log_available_layers, "Available layers") + layers = + layers_fn.() + |> Log.inspect(:log_available_layers, "Available layers") - reverse_host = Enum.reverse( host ) + reverse_host = Enum.reverse(host) response_conn = layers @@ -432,7 +478,7 @@ defmodule Matcher do defp sort_and_group_accept_headers(accept) do accept |> safe_parse_accept_header() - # |> IO.inspect(label: "parsed_accept_header") + |> IO.inspect(label: "parsed_accept_header") |> Enum.sort_by(&elem(&1, 3)) |> Enum.group_by(&elem(&1, 3)) |> Map.to_list() diff --git a/lib/mu_dispatcher.ex b/lib/mu_dispatcher.ex index e9a9132..575fb38 100644 --- a/lib/mu_dispatcher.ex +++ b/lib/mu_dispatcher.ex @@ -10,11 +10,23 @@ defmodule MuDispatcher do port = 80 children = [ - {Plug.Cowboy, scheme: :http, plug: PlugRouterDispatcher, options: [port: port]} + # this is kinda strange, but the 'plug:' field is not used when 'dispatch:' is provided (my understanding) + {Plug.Adapters.Cowboy, + scheme: :http, plug: PlugRouterDispatcher, options: [dispatch: dispatch, port: port]} ] Logger.info("Mu Dispatcher starting on port #{port}") Supervisor.start_link(children, strategy: :one_for_one) end + + defp dispatch do + [ + {:_, + [ + {"/ws/[...]", WebsocketHandler, %{}}, + {:_, Plug.Cowboy.Handler, {PlugRouterDispatcher, []}} + ]} + ] + end end diff --git a/lib/plug_router_dispatcher.ex b/lib/plug_router_dispatcher.ex index 128cc40..bd1eaba 100644 --- a/lib/plug_router_dispatcher.ex +++ b/lib/plug_router_dispatcher.ex @@ -1,3 +1,5 @@ +alias Dispatcher.Log + defmodule PlugRouterDispatcher do use Plug.Router @@ -6,6 +8,6 @@ defmodule PlugRouterDispatcher do plug(:dispatch) match _ do - Dispatcher.dispatch( conn ) + Dispatcher.dispatch(conn) end end diff --git a/lib/proxy.ex b/lib/proxy.ex index 9287db1..30fc740 100644 --- a/lib/proxy.ex +++ b/lib/proxy.ex @@ -1,5 +1,8 @@ defmodule Proxy do - @request_manipulators [Manipulators.AddXRewriteUrlHeader,Manipulators.RemoveAcceptEncodingHeader] + @request_manipulators [ + Manipulators.AddXRewriteUrlHeader, + Manipulators.RemoveAcceptEncodingHeader + ] @response_manipulators [Manipulators.AddVaryHeader] @manipulators ProxyManipulatorSettings.make_settings( @request_manipulators, @@ -13,6 +16,7 @@ defmodule Proxy do conn, path, base, - @manipulators) + @manipulators + ) end end diff --git a/lib/websocket_handler.ex b/lib/websocket_handler.ex new file mode 100644 index 0000000..63abc19 --- /dev/null +++ b/lib/websocket_handler.ex @@ -0,0 +1,122 @@ +alias Dispatcher.Log + +defmodule WebsocketHandler do + @behaviour :cowboy_websocket + + def init(req, state) do + # Get path info + {_, target} = + :cowboy_req.parse_qs(req) + |> Enum.find(fn {head, _} -> head == "target" end) + + ws = + Dispatcher.get_websocket(target) + |> Log.inspect(:ws_log_all, label: "websocket connecting to target") + + new_state = + state + |> Map.put(:host, ws.host) + |> Map.put(:path, ws.path) + |> Map.put(:port, ws.port) + |> Map.put(:ready, false) + |> Map.put(:buffer, []) + + {:cowboy_websocket, req, new_state} + end + + def websocket_init(state) do + Log.inspect(state, :log_ws_all, label: "websocket all start connect with") + + connect_opts = %{ + connect_timeout: :timer.minutes(1), + retry: 10, + retry_timeout: 300 + } + + # conn :: pid() + {:ok, conn} = :gun.open(to_charlist(state.host), state.port, connect_opts) + {:ok, :http} = :gun.await_up(conn) + + # streamref :: StreamRef + streamref = :gun.ws_upgrade(conn, to_charlist(state.path)) + + new_state = + state + |> Map.put(:back_pid, conn) + |> Map.put(:back_ref, streamref) + + {:ok, new_state} + end + + def websocket_handle(message, state) do + new_state = + if state.ready do + Log.inspect(message, :log_ws_frontend, label: "websocket frontend message") + |> Log.inspect(:log_ws_all, label: "websocket all frontend message") + + :ok = :gun.ws_send(state.back_pid, state.back_ref, message) + state + else + Log.inspect(message, :log_ws_frontend, + label: "websocket frontend message postponed (connection not started)" + ) + |> Log.inspect(:log_ws_all, + label: "websocket all frontend message postponed (connection not started)" + ) + + buf = [message | state.buffer] + Map.put(state, :buffer, buf) + end + + {:ok, new_state} + end + + def websocket_info({:gun_ws, _pid, _ref, msg}, state) do + Log.inspect(msg, :log_ws_backend, label: "websocket backend message") + |> Log.inspect(:log_ws_all, label: "websocket all backend message") + + {:reply, msg, state} + end + + def websocket_info({:gun_error, _gun_pid, _stream_ref, reason}, _state) do + exit({:ws_upgrade_failed, reason}) + end + + def websocket_info({:gun_response, _gun_pid, _, _, status, headers}, _state) do + Log.inspect({"Websocket upgrade failed.", headers}, :log_ws_all, label: "websocket all") + exit({:ws_upgrade_failed, status, headers}) + end + + def websocket_info({:gun_upgrade, _, _, ["websocket"], headers}, state) do + Log.inspect("ws upgrade succesful", :log_ws_all, label: "websocket all") + Log.inspect(headers, :log_ws_all, label: "websocket all") + + state.buffer + |> Enum.reverse() + |> Enum.each(fn x -> + Log.inspect(x, :log_ws_frontend, label: "postponed sending message") + Log.inspect(x, :log_ws_all, label: "postponed sending message") + :gun.ws_send(state.back_pid, state.back_ref, x) + end) + + new_state = + state + |> Map.put(:ready, true) + |> Map.put(:buffer, []) + + {:ok, new_state} + end + + def websocket_info(info, state) do + Log.inspect(info, :log_ws_unhandled, label: "websocket unhandled info") + |> Log.inspect(:log_ws_all, label: "websocket all info") + + {:ok, state} + end + + def terminate(_reason, _req, state) do + Log.inspect("Closing", :log_ws_all, label: "websocket all") + :gun.shutdown(state.back_pid) + :ok + end +end diff --git a/mix.exs b/mix.exs index 292e43f..4a60a29 100644 --- a/mix.exs +++ b/mix.exs @@ -10,9 +10,9 @@ defmodule Dispatcher.Mixfile do # Type `mix help compile.app` for more information def application do [ - extra_applications: [:logger, :plug_mint_proxy, :cowboy, :plug, :accept], + extra_applications: [:logger, :plug_mint_proxy, :cowboy, :plug], mod: {MuDispatcher, []}, - env: [] + env: [], ] end @@ -27,9 +27,14 @@ defmodule Dispatcher.Mixfile do # Type `mix help deps` for more examples and options defp deps do [ - {:plug_mint_proxy, git: "https://github.com/madnificent/plug-mint-proxy.git", tag: "v0.0.2"}, + {:plug_mint_proxy, + git: "https://github.com/madnificent/plug-mint-proxy.git", tag: "v0.0.2"}, + # {:plug, "~> 1.10.4"}, + {:plug_cowboy, "~> 2.4.0"}, + {:gun, "~> 2.0.0-rc.2"}, {:accept, "~> 0.3.5"}, - {:observer_cli, "~> 1.5"} + {:observer_cli, "~> 1.5"}, + {:exsync, "~> 0.2", only: :dev} ] end end diff --git a/mix.lock b/mix.lock index 7b7ac0c..167fc04 100644 --- a/mix.lock +++ b/mix.lock @@ -1,15 +1,20 @@ %{ - "accept": {:hex, :accept, "0.3.5", "b33b127abca7cc948bbe6caa4c263369abf1347cfa9d8e699c6d214660f10cd1", [:rebar3], [], "hexpm"}, - "castore": {:hex, :castore, "0.1.3", "61d720c168d8e3a7d96f188f73d50d7ec79aa619cdabf0acd3782b01ff3a9f10", [:mix], [], "hexpm"}, - "cowboy": {:hex, :cowboy, "2.6.3", "99aa50e94e685557cad82e704457336a453d4abcb77839ad22dbe71f311fcc06", [:rebar3], [{:cowlib, "~> 2.7.3", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm"}, - "cowlib": {:hex, :cowlib, "2.7.3", "a7ffcd0917e6d50b4d5fb28e9e2085a0ceb3c97dea310505f7460ff5ed764ce9", [:rebar3], [], "hexpm"}, - "mime": {:hex, :mime, "1.3.1", "30ce04ab3175b6ad0bdce0035cba77bba68b813d523d1aac73d9781b4d193cf8", [:mix], [], "hexpm"}, - "mint": {:hex, :mint, "0.4.0", "b93a10192957624ed4a8b8641eff1819019c36487bdf49e2b505afd2cc9b7911", [:mix], [{:castore, "~> 0.1.0", [hex: :castore, repo: "hexpm", optional: true]}], "hexpm"}, - "observer_cli": {:hex, :observer_cli, "1.5.3", "d42e20054116c49d5242d3ff9e1913acccebe6015f449d6e312a5bc160e79a62", [:mix, :rebar3], [{:recon, "~>2.5.0", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm"}, - "plug": {:hex, :plug, "1.8.3", "12d5f9796dc72e8ac9614e94bda5e51c4c028d0d428e9297650d09e15a684478", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm"}, - "plug_cowboy": {:hex, :plug_cowboy, "2.1.0", "b75768153c3a8a9e8039d4b25bb9b14efbc58e9c4a6e6a270abff1cd30cbe320", [:mix], [{:cowboy, "~> 2.5", [hex: :cowboy, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"}, - "plug_crypto": {:hex, :plug_crypto, "1.0.0", "18e49317d3fa343f24620ed22795ec29d4a5e602d52d1513ccea0b07d8ea7d4d", [:mix], [], "hexpm"}, + "accept": {:hex, :accept, "0.3.5", "b33b127abca7cc948bbe6caa4c263369abf1347cfa9d8e699c6d214660f10cd1", [:rebar3], [], "hexpm", "11b18c220bcc2eab63b5470c038ef10eb6783bcb1fcdb11aa4137defa5ac1bb8"}, + "castore": {:hex, :castore, "0.1.11", "c0665858e0e1c3e8c27178e73dffea699a5b28eb72239a3b2642d208e8594914", [:mix], [], "hexpm", "91b009ba61973b532b84f7c09ce441cba7aa15cb8b006cf06c6f4bba18220081"}, + "cowboy": {:hex, :cowboy, "2.9.0", "865dd8b6607e14cf03282e10e934023a1bd8be6f6bacf921a7e2a96d800cd452", [:make, :rebar3], [{:cowlib, "2.11.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "2c729f934b4e1aa149aff882f57c6372c15399a20d54f65c8d67bef583021bde"}, + "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.3.1", "ebd1a1d7aff97f27c66654e78ece187abdc646992714164380d8a041eda16754", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3a6efd3366130eab84ca372cbd4a7d3c3a97bdfcfb4911233b035d117063f0af"}, + "cowlib": {:hex, :cowlib, "2.11.0", "0b9ff9c346629256c42ebe1eeb769a83c6cb771a6ee5960bd110ab0b9b872063", [:make, :rebar3], [], "hexpm", "2b3e9da0b21c4565751a6d4901c20d1b4cc25cbb7fd50d91d2ab6dd287bc86a9"}, + "exsync": {:hex, :exsync, "0.2.4", "5cdc824553e0f4c4bf60018a9a6bbd5d3b51f93ef8401a0d8545f93127281d03", [:mix], [{:file_system, "~> 0.2", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "f7622d8bb98abbe473aa066ae46f91afdf7a5346b8b89728404f7189d2e80896"}, + "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, + "gun": {:hex, :gun, "2.0.0-rc.2", "7c489a32dedccb77b6e82d1f3c5a7dadfbfa004ec14e322cdb5e579c438632d2", [:make, :rebar3], [{:cowlib, "2.11.0", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "6b9d1eae146410d727140dbf8b404b9631302ecc2066d1d12f22097ad7d254fc"}, + "mime": {:hex, :mime, "1.6.0", "dabde576a497cef4bbdd60aceee8160e02a6c89250d6c0b29e56c0dfb00db3d2", [:mix], [], "hexpm", "31a1a8613f8321143dde1dafc36006a17d28d02bdfecb9e95a880fa7aabd19a7"}, + "mint": {:hex, :mint, "0.4.0", "b93a10192957624ed4a8b8641eff1819019c36487bdf49e2b505afd2cc9b7911", [:mix], [{:castore, "~> 0.1.0", [hex: :castore, repo: "hexpm", optional: true]}], "hexpm", "7957910c271f6a5df20b7ee61a64d74db4475641aed89076def4cbeb778ac9b4"}, + "observer_cli": {:hex, :observer_cli, "1.6.2", "016588e9a966247401bcbf02976d468f1e6f06891dde44f873c9259c6496cca1", [:mix, :rebar3], [{:recon, "~>2.5.1", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm", "c23db9e4cca0e849adc42b0a099affb9e6267c5f23a871fc6f144348b249341f"}, + "plug": {:hex, :plug, "1.8.3", "12d5f9796dc72e8ac9614e94bda5e51c4c028d0d428e9297650d09e15a684478", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "164baaeb382d19beee0ec484492aa82a9c8685770aee33b24ec727a0971b34d0"}, + "plug_cowboy": {:hex, :plug_cowboy, "2.4.1", "779ba386c0915027f22e14a48919a9545714f849505fa15af2631a0d298abf0f", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "d72113b6dff7b37a7d9b2a5b68892808e3a9a752f2bf7e503240945385b70507"}, + "plug_crypto": {:hex, :plug_crypto, "1.2.2", "05654514ac717ff3a1843204b424477d9e60c143406aa94daf2274fdd280794d", [:mix], [], "hexpm", "87631c7ad914a5a445f0a3809f99b079113ae4ed4b867348dd9eec288cecb6db"}, "plug_mint_proxy": {:git, "https://github.com/madnificent/plug-mint-proxy.git", "9d8d9748b17b79571573bb9882e24d5c516961f2", [tag: "v0.0.2"]}, - "ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm"}, - "recon": {:hex, :recon, "2.5.0", "2f7fcbec2c35034bade2f9717f77059dc54eb4e929a3049ca7ba6775c0bd66cd", [:mix, :rebar3], [], "hexpm"}, + "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, + "recon": {:hex, :recon, "2.5.2", "cba53fa8db83ad968c9a652e09c3ed7ddcc4da434f27c3eaa9ca47ffb2b1ff03", [:mix, :rebar3], [], "hexpm", "2c7523c8dee91dff41f6b3d63cba2bd49eb6d2fe5bf1eec0df7f87eb5e230e1c"}, + "telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"}, } From 72d23f6f3d30b6031d37feffa3d5132c6ed91822 Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Wed, 21 Jul 2021 20:16:00 +0200 Subject: [PATCH 2/4] move cowboy ws proxy to seperate git repo --- config/config.exs | 6 +- lib/dispatcher/log.ex | 4 - .../remove_accept_encoding_header.ex | 6 +- lib/matcher.ex | 3 +- lib/mu_dispatcher.ex | 18 ++- lib/plug_router_dispatcher.ex | 2 - lib/websocket_handler.ex | 122 ------------------ mix.exs | 1 + mix.lock | 1 + 9 files changed, 23 insertions(+), 140 deletions(-) delete mode 100644 lib/websocket_handler.ex diff --git a/config/config.exs b/config/config.exs index ebaa566..576f57f 100644 --- a/config/config.exs +++ b/config/config.exs @@ -44,11 +44,7 @@ config :dispatcher, # log whenever a layer starts processing log_layer_start_processing: CH.system_boolean("LOG_LAYER_START_PROCESSING"), # log whenever a layer matched, and if no matching layer was found - log_layer_matching: CH.system_boolean("LOG_LAYER_MATCHING"), - log_ws_all: CH.system_boolean("LOG_WS_ALL"), - log_ws_backend: CH.system_boolean("LOG_WS_BACKEND"), - log_ws_frontend: CH.system_boolean("LOG_WS_FRONTEND"), - log_ws_unhandled: CH.system_boolean("LOG_WS_UNHANDLED") + log_layer_matching: CH.system_boolean("LOG_LAYER_MATCHING") # It is also possible to import configuration files, relative to this # directory. For example, you can emulate configuration per environment diff --git a/lib/dispatcher/log.ex b/lib/dispatcher/log.ex index 1535234..b0a785b 100644 --- a/lib/dispatcher/log.ex +++ b/lib/dispatcher/log.ex @@ -2,10 +2,6 @@ defmodule Dispatcher.Log do @type log_name :: :log_layer_start_processing | :log_layer_matching - | :log_ws_all - | :log_ws_backend - | :log_ws_frontend - | :log_ws_unhandled @spec log(log_name, any()) :: any() def log(name, content) do diff --git a/lib/manipulators/remove_accept_encoding_header.ex b/lib/manipulators/remove_accept_encoding_header.ex index 4e50ea2..f55ad70 100644 --- a/lib/manipulators/remove_accept_encoding_header.ex +++ b/lib/manipulators/remove_accept_encoding_header.ex @@ -3,9 +3,9 @@ defmodule Manipulators.RemoveAcceptEncodingHeader do @impl true def headers(headers, connection) do - # headers = - # headers - # |> Enum.reject( &match?( {"accept_encoding", _}, &1 ) ) + headers = + headers + |> Enum.reject( &match?( {"accept_encoding", _}, &1 ) ) {headers, connection} end diff --git a/lib/matcher.ex b/lib/matcher.ex index e030ff3..058defb 100644 --- a/lib/matcher.ex +++ b/lib/matcher.ex @@ -8,7 +8,6 @@ defmodule Matcher do quote do require Matcher import Matcher - import Plug.Router, only: [forward: 2] import Plug.Conn, only: [send_resp: 3] import Proxy, only: [forward: 3] @@ -478,7 +477,7 @@ defmodule Matcher do defp sort_and_group_accept_headers(accept) do accept |> safe_parse_accept_header() - |> IO.inspect(label: "parsed_accept_header") + # |> IO.inspect(label: "parsed_accept_header") |> Enum.sort_by(&elem(&1, 3)) |> Enum.group_by(&elem(&1, 3)) |> Map.to_list() diff --git a/lib/mu_dispatcher.ex b/lib/mu_dispatcher.ex index 575fb38..bc14a61 100644 --- a/lib/mu_dispatcher.ex +++ b/lib/mu_dispatcher.ex @@ -11,7 +11,7 @@ defmodule MuDispatcher do children = [ # this is kinda strange, but the 'plug:' field is not used when 'dispatch:' is provided (my understanding) - {Plug.Adapters.Cowboy, + {Plug.Cowboy, scheme: :http, plug: PlugRouterDispatcher, options: [dispatch: dispatch, port: port]} ] @@ -21,10 +21,24 @@ defmodule MuDispatcher do end defp dispatch do + default = %{ + host: "localhost", + port: 80, + path: "/" + } + + f = fn req -> + {_, target} = + :cowboy_req.parse_qs(req) + |> Enum.find(fn {head, _} -> head == "target" end) + + Dispatcher.get_websocket(target) + end + [ {:_, [ - {"/ws/[...]", WebsocketHandler, %{}}, + {"/ws/[...]", WsHandler, {f, default}}, {:_, Plug.Cowboy.Handler, {PlugRouterDispatcher, []}} ]} ] diff --git a/lib/plug_router_dispatcher.ex b/lib/plug_router_dispatcher.ex index bd1eaba..5a7b25d 100644 --- a/lib/plug_router_dispatcher.ex +++ b/lib/plug_router_dispatcher.ex @@ -1,5 +1,3 @@ -alias Dispatcher.Log - defmodule PlugRouterDispatcher do use Plug.Router diff --git a/lib/websocket_handler.ex b/lib/websocket_handler.ex deleted file mode 100644 index 63abc19..0000000 --- a/lib/websocket_handler.ex +++ /dev/null @@ -1,122 +0,0 @@ -alias Dispatcher.Log - -defmodule WebsocketHandler do - @behaviour :cowboy_websocket - - def init(req, state) do - # Get path info - {_, target} = - :cowboy_req.parse_qs(req) - |> Enum.find(fn {head, _} -> head == "target" end) - - ws = - Dispatcher.get_websocket(target) - |> Log.inspect(:ws_log_all, label: "websocket connecting to target") - - new_state = - state - |> Map.put(:host, ws.host) - |> Map.put(:path, ws.path) - |> Map.put(:port, ws.port) - |> Map.put(:ready, false) - |> Map.put(:buffer, []) - - {:cowboy_websocket, req, new_state} - end - - def websocket_init(state) do - Log.inspect(state, :log_ws_all, label: "websocket all start connect with") - - connect_opts = %{ - connect_timeout: :timer.minutes(1), - retry: 10, - retry_timeout: 300 - } - - # conn :: pid() - {:ok, conn} = :gun.open(to_charlist(state.host), state.port, connect_opts) - {:ok, :http} = :gun.await_up(conn) - - # streamref :: StreamRef - streamref = :gun.ws_upgrade(conn, to_charlist(state.path)) - - new_state = - state - |> Map.put(:back_pid, conn) - |> Map.put(:back_ref, streamref) - - {:ok, new_state} - end - - def websocket_handle(message, state) do - new_state = - if state.ready do - Log.inspect(message, :log_ws_frontend, label: "websocket frontend message") - |> Log.inspect(:log_ws_all, label: "websocket all frontend message") - - :ok = :gun.ws_send(state.back_pid, state.back_ref, message) - state - else - Log.inspect(message, :log_ws_frontend, - label: "websocket frontend message postponed (connection not started)" - ) - |> Log.inspect(:log_ws_all, - label: "websocket all frontend message postponed (connection not started)" - ) - - buf = [message | state.buffer] - Map.put(state, :buffer, buf) - end - - {:ok, new_state} - end - - def websocket_info({:gun_ws, _pid, _ref, msg}, state) do - Log.inspect(msg, :log_ws_backend, label: "websocket backend message") - |> Log.inspect(:log_ws_all, label: "websocket all backend message") - - {:reply, msg, state} - end - - def websocket_info({:gun_error, _gun_pid, _stream_ref, reason}, _state) do - exit({:ws_upgrade_failed, reason}) - end - - def websocket_info({:gun_response, _gun_pid, _, _, status, headers}, _state) do - Log.inspect({"Websocket upgrade failed.", headers}, :log_ws_all, label: "websocket all") - exit({:ws_upgrade_failed, status, headers}) - end - - def websocket_info({:gun_upgrade, _, _, ["websocket"], headers}, state) do - Log.inspect("ws upgrade succesful", :log_ws_all, label: "websocket all") - Log.inspect(headers, :log_ws_all, label: "websocket all") - - state.buffer - |> Enum.reverse() - |> Enum.each(fn x -> - Log.inspect(x, :log_ws_frontend, label: "postponed sending message") - Log.inspect(x, :log_ws_all, label: "postponed sending message") - :gun.ws_send(state.back_pid, state.back_ref, x) - end) - - new_state = - state - |> Map.put(:ready, true) - |> Map.put(:buffer, []) - - {:ok, new_state} - end - - def websocket_info(info, state) do - Log.inspect(info, :log_ws_unhandled, label: "websocket unhandled info") - |> Log.inspect(:log_ws_all, label: "websocket all info") - - {:ok, state} - end - - def terminate(_reason, _req, state) do - Log.inspect("Closing", :log_ws_all, label: "websocket all") - :gun.shutdown(state.back_pid) - :ok - end -end diff --git a/mix.exs b/mix.exs index 4a60a29..bac50aa 100644 --- a/mix.exs +++ b/mix.exs @@ -27,6 +27,7 @@ defmodule Dispatcher.Mixfile do # Type `mix help deps` for more examples and options defp deps do [ + {:cowboy_ws_proxy, git: "https://github.com/ajuvercr/elixir-cowboy-ws-proxy-handler.git", tag: "v0.1"}, {:plug_mint_proxy, git: "https://github.com/madnificent/plug-mint-proxy.git", tag: "v0.0.2"}, # {:plug, "~> 1.10.4"}, diff --git a/mix.lock b/mix.lock index 167fc04..909cdcb 100644 --- a/mix.lock +++ b/mix.lock @@ -3,6 +3,7 @@ "castore": {:hex, :castore, "0.1.11", "c0665858e0e1c3e8c27178e73dffea699a5b28eb72239a3b2642d208e8594914", [:mix], [], "hexpm", "91b009ba61973b532b84f7c09ce441cba7aa15cb8b006cf06c6f4bba18220081"}, "cowboy": {:hex, :cowboy, "2.9.0", "865dd8b6607e14cf03282e10e934023a1bd8be6f6bacf921a7e2a96d800cd452", [:make, :rebar3], [{:cowlib, "2.11.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "2c729f934b4e1aa149aff882f57c6372c15399a20d54f65c8d67bef583021bde"}, "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.3.1", "ebd1a1d7aff97f27c66654e78ece187abdc646992714164380d8a041eda16754", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3a6efd3366130eab84ca372cbd4a7d3c3a97bdfcfb4911233b035d117063f0af"}, + "cowboy_ws_proxy": {:git, "https://github.com/ajuvercr/elixir-cowboy-ws-proxy-handler.git", "e015e27775af30d4e3d7ca5629d97191cca61555", [tag: "v0.1"]}, "cowlib": {:hex, :cowlib, "2.11.0", "0b9ff9c346629256c42ebe1eeb769a83c6cb771a6ee5960bd110ab0b9b872063", [:make, :rebar3], [], "hexpm", "2b3e9da0b21c4565751a6d4901c20d1b4cc25cbb7fd50d91d2ab6dd287bc86a9"}, "exsync": {:hex, :exsync, "0.2.4", "5cdc824553e0f4c4bf60018a9a6bbd5d3b51f93ef8401a0d8545f93127281d03", [:mix], [{:file_system, "~> 0.2", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "f7622d8bb98abbe473aa066ae46f91afdf7a5346b8b89728404f7189d2e80896"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, From 5f6e408d7a55c1b7580ff59b20e19ba7aad8656c Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Thu, 9 Sep 2021 15:34:28 +0200 Subject: [PATCH 3/4] update README --- README.md | 40 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 36 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 3f7a479..5e7fe84 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ The Dispatcher runs as an application in which the `Dispatcher` module is overri The disptacher is configured using the dispatcher.ex file in a [mu-project](https://github.com/mu-semtech/mu-project). -The basic (default) configuration of the mu-dispatcher is an Elixir module named `Dispatcher` which uses the `Matcher` functionality. +The basic (default) configuration of the mu-dispatcher is an Elixir module named `Dispatcher` which uses the `Matcher` functionality. An empty set of accept types is required (`define_accept_types []`). ```elixir @@ -251,6 +251,40 @@ If you need to access a part of the API, revert back to the array syntax and def This specific implementation does require at least one subdomain and it will thus not match `redpencil.io`. +### Forwarding websockets + +Dispatcher can forward websocket tunnels. + +An example rule is as follows: + +```elixir + match "/websocket" do + ws(conn, "ws://localhost:7999") + end +``` + +Any websocket connections on `/websocket` get's redirected to `/ws?target=`. +The Gun library listens to websockets on `/ws` which makes everything work. +The `target` query parameter tells Gun what host to forward to. + + +Note: following redirects is not required in the websockets specs, and most browsers don't support this. + +Example workarounds: +```js +async function createRedirectedWebsocket(url) { + // This prints an error to the console due to unexpected upgrade response + const resp = await fetch(url); + const ws_url = resp.url.replace(/^http/, 'ws'); + return new WebSocket(ws_url); +} +``` + +With the 'ws' npm package it is possible to set a `followRedirects` flag. +```js +const WebSocket = require('ws') +const ws = new WebSocket('ws://localhost/ws2', options={'followRedirects': true}); +``` ### Fallback routes and 404 pages @@ -447,7 +481,7 @@ Forwarding connections is built on top of `plug_mint_proxy` which uses the Mint ### Wiring with Plug [Plug](https://github.com/elixir-plug/plug) expects call to be matched using its own matcher and dispatcher. -This library provides some extra support. +This library provides some extra support. Although tying this in within Plug might be simple, the request is dispatched to our own matcher in [plug_router_dispatcher.ex](./lib/plug_router_dispatcher.ex). ### Header manipulation @@ -470,5 +504,3 @@ High-level the dispatching works as follows: 4. For each (B) try to find a matched solution 5. If a solution is found, return it 6. If no solution is found, try to find a matched solution with the `last_call` option set to true - - From ae1078ef5bd158c0140007aca1ea794e4de14bbb Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Fri, 17 Sep 2021 12:51:37 +0200 Subject: [PATCH 4/4] move /ws to /.mu/ws + update README --- README.md | 6 +++--- lib/dispatcher.ex | 6 +++--- lib/matcher.ex | 2 +- lib/mu_dispatcher.ex | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 5e7fe84..e2a09be 100644 --- a/README.md +++ b/README.md @@ -259,12 +259,12 @@ An example rule is as follows: ```elixir match "/websocket" do - ws(conn, "ws://localhost:7999") + ws conn, "ws://push-service-ws/" end ``` -Any websocket connections on `/websocket` get's redirected to `/ws?target=`. -The Gun library listens to websockets on `/ws` which makes everything work. +Any websocket connections on `/websocket` get's redirected to `/.mu/ws?target=`. +The Gun library listens to websockets on `/.mu/ws` which makes everything work. The `target` query parameter tells Gun what host to forward to. diff --git a/lib/dispatcher.ex b/lib/dispatcher.ex index be884b4..ab11135 100644 --- a/lib/dispatcher.ex +++ b/lib/dispatcher.ex @@ -41,10 +41,10 @@ defmodule Dispatcher do # Websocket example route # This forwards to /ws?target=<...> - # Then forwards websocket from /ws?target=<...> to ws://localhost:7999 + # Then forwards websocket from /ws?target=<...> to ws://push-service-ws/ - match "/ws2" do - ws(conn, "ws://localhost:7999") + match "/push-service/ws" do + ws conn, "ws://push-service-ws/" end diff --git a/lib/matcher.ex b/lib/matcher.ex index 058defb..7515d00 100644 --- a/lib/matcher.ex +++ b/lib/matcher.ex @@ -56,7 +56,7 @@ defmodule Matcher do quote do unquote(conn) |> Plug.Conn.resp(:found, "") - |> Plug.Conn.put_resp_header("location", "/ws?target=" <> unquote(id)) + |> Plug.Conn.put_resp_header("location", "/.mu/ws?target=" <> unquote(id)) end end diff --git a/lib/mu_dispatcher.ex b/lib/mu_dispatcher.ex index bc14a61..1b834cb 100644 --- a/lib/mu_dispatcher.ex +++ b/lib/mu_dispatcher.ex @@ -38,7 +38,7 @@ defmodule MuDispatcher do [ {:_, [ - {"/ws/[...]", WsHandler, {f, default}}, + {"/.mu/ws/[...]", WsHandler, {f, default}}, {:_, Plug.Cowboy.Handler, {PlugRouterDispatcher, []}} ]} ]