From b920b09d37dae5955f6d061195c9cc51864c2129 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AD=99=E5=BF=97=E6=96=87?= Date: Wed, 29 May 2013 14:29:48 +0800 Subject: [PATCH 1/3] add filter by tag support --- lib/fluent/plugin/out_websocket.rb | 35 ++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/lib/fluent/plugin/out_websocket.rb b/lib/fluent/plugin/out_websocket.rb index 3a8723b..8568543 100644 --- a/lib/fluent/plugin/out_websocket.rb +++ b/lib/fluent/plugin/out_websocket.rb @@ -15,6 +15,11 @@ require 'em-websocket' require 'thread' + +class EventMachine::WebSocket::Connection + attr_accessor :tags +end + module Fluent $lock = Mutex::new $channel = EM::Channel.new @@ -34,8 +39,17 @@ def configure(conf) $log.info "WebSocket server #{@host}:#{@port} [msgpack: #{@use_msgpack}]" EM.run { EM::WebSocket.run(:host => @host, :port => @port) do |ws| + ws.tags = [] ws.onopen { |handshake| - callback = @use_msgpack ? proc{|msg| ws.send_binary(msg)} : proc{|msg| ws.send(msg)} + callback = @use_msgpack ? proc{|msg| + if ( ws.tags.include? msg[0]) then + ws.send_binary(msg[1]) + end + } + : proc{|msg| + ws.send(msg) + } + $lock.synchronize do sid = $channel.subscribe callback $log.trace "WebSocket connection: ID " + sid.to_s @@ -47,8 +61,21 @@ def configure(conf) } end - #ws.onmessage { |msg| - #} + ws.onmessage { |msg| + if (msg == "reset") + ws.tags.clear() + elsif (msg.start_with? "add ") + tag = msg[4..-1] + + if !ws.tags.include? tag + ws.tags << tag + end + elsif (msg.start_with? "del ") + ws.tags.delete(msg[4..-1]) + end + + $log.info "msg from client: #{msg} final tag #{ws.tags}" + } } end } @@ -74,7 +101,7 @@ def emit(tag, es, chain) if (@add_tag) then data.unshift(tag) end output = @use_msgpack ? data.to_msgpack : data.to_json $lock.synchronize do - $channel.push output + $channel.push [tag, output] end } end From f179f270c2aa37823c1ffa6660459b9a28540cce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AD=99=E5=BF=97=E6=96=87?= Date: Wed, 29 May 2013 16:45:31 +0800 Subject: [PATCH 2/3] send all data in default --- lib/fluent/plugin/out_websocket.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/out_websocket.rb b/lib/fluent/plugin/out_websocket.rb index 8568543..cb136f7 100644 --- a/lib/fluent/plugin/out_websocket.rb +++ b/lib/fluent/plugin/out_websocket.rb @@ -42,8 +42,8 @@ def configure(conf) ws.tags = [] ws.onopen { |handshake| callback = @use_msgpack ? proc{|msg| - if ( ws.tags.include? msg[0]) then - ws.send_binary(msg[1]) + if (ws.tags.empty? || (ws.tags.include? msg[0])) + ws.send_binary(msg[1]) end } : proc{|msg| From 20823b8313211b126d8c12ae39ff44368587b66e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AD=99=E5=BF=97=E6=96=87?= Date: Thu, 30 May 2013 10:28:00 +0800 Subject: [PATCH 3/3] add tag filter support for use_javascript --- lib/fluent/plugin/out_websocket.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin/out_websocket.rb b/lib/fluent/plugin/out_websocket.rb index cb136f7..6cf97e8 100644 --- a/lib/fluent/plugin/out_websocket.rb +++ b/lib/fluent/plugin/out_websocket.rb @@ -47,7 +47,9 @@ def configure(conf) end } : proc{|msg| - ws.send(msg) + if (ws.tags.empty? || (ws.tags.include? msg[0])) + ws.send(msg[1]) + end } $lock.synchronize do