lib/fluent/plugin/out_sakuraio.rb in fluent-plugin-sakuraio-0.1.1 vs lib/fluent/plugin/out_sakuraio.rb in fluent-plugin-sakuraio-0.1.2

- old
+ new

@@ -12,11 +12,10 @@ config_param :channels, :hash def configure(conf) super - @client = Faye::WebSocket::Client.new(@url) ensure_reactor_running thread_create(:out_sakuraio, &method(:run)) end def ensure_reactor_running @@ -25,15 +24,20 @@ EM.run end end def run() + @client = Faye::WebSocket::Client.new(@url) EM.next_tick do @client.on :open do log.info "sakuraio: starting websocket connection for #{@url}." end + @client.on :message do |event| + log.debug "sakuraio: received message #{event.data}" + end + @client.on :error do |event| log.warn "sakuraio: #{event.message}" end @client.on :close do @@ -43,22 +47,27 @@ end def process(_tag, es) es.each do |_time, record| log.debug "sakuraio: process record #{record}" - @client.send(encode_record(record)) + modules.each do |m| + s = encode_record(m, record) + log.debug "sakuraio: encoded json #{s}" + @client.send(s) + end end end - def encode_record(record) + def encode_record(m, record) data = [] @channels.each do |ch, v| key, type = v - data.push('channel' => ch, + data.push('channel' => ch.to_i, 'type' => type, 'value' => record[key]) end hash = { 'type' => 'channels', + 'module' => m, 'payload' => { 'channels' => data } } Yajl::Encoder.encode(hash) end end end