diff --git a/lib/fluent/plugin/out_websocket.rb b/lib/fluent/plugin/out_websocket.rb index 3a8723b..6cf97e8 100644 --- a/lib/fluent/plugin/out_websocket.rb +++ b/lib/fluent/plugin/out_websocket.rb @@ -15,6 +15,11 @@ require 'em-websocket' require 'thread' + +class EventMachine::WebSocket::Connection + attr_accessor :tags +end + module Fluent $lock = Mutex::new $channel = EM::Channel.new @@ -34,8 +39,19 @@ def configure(conf) $log.info "WebSocket server #{@host}:#{@port} [msgpack: #{@use_msgpack}]" EM.run { EM::WebSocket.run(:host => @host, :port => @port) do |ws| + ws.tags = [] ws.onopen { |handshake| - callback = @use_msgpack ? proc{|msg| ws.send_binary(msg)} : proc{|msg| ws.send(msg)} + callback = @use_msgpack ? proc{|msg| + if (ws.tags.empty? || (ws.tags.include? msg[0])) + ws.send_binary(msg[1]) + end + } + : proc{|msg| + if (ws.tags.empty? || (ws.tags.include? msg[0])) + ws.send(msg[1]) + end + } + $lock.synchronize do sid = $channel.subscribe callback $log.trace "WebSocket connection: ID " + sid.to_s @@ -47,8 +63,21 @@ def configure(conf) } end - #ws.onmessage { |msg| - #} + ws.onmessage { |msg| + if (msg == "reset") + ws.tags.clear() + elsif (msg.start_with? "add ") + tag = msg[4..-1] + + if !ws.tags.include? tag + ws.tags << tag + end + elsif (msg.start_with? "del ") + ws.tags.delete(msg[4..-1]) + end + + $log.info "msg from client: #{msg} final tag #{ws.tags}" + } } end } @@ -74,7 +103,7 @@ def emit(tag, es, chain) if (@add_tag) then data.unshift(tag) end output = @use_msgpack ? data.to_msgpack : data.to_json $lock.synchronize do - $channel.push output + $channel.push [tag, output] end } end