lib/fluent/plugin/out_sakuraio.rb in fluent-plugin-sakuraio-0.1.2 vs lib/fluent/plugin/out_sakuraio.rb in fluent-plugin-sakuraio-0.1.3
- old
+ new
@@ -1,5 +1,7 @@
+# frozen_string_literal: true
+
require 'fluent/plugin/output'
require 'yajl'
require 'faye/websocket'
module Fluent::Plugin
@@ -18,16 +20,17 @@
thread_create(:out_sakuraio, &method(:run))
end
def ensure_reactor_running
return if EM.reactor_running?
+
thread_create(:out_sakuraio_reactor) do
EM.run
end
end
- def run()
+ def run
@client = Faye::WebSocket::Client.new(@url)
EM.next_tick do
@client.on :open do
log.info "sakuraio: starting websocket connection for #{@url}."
end
@@ -44,30 +47,30 @@
@client = nil
end
end
end
- def process(_tag, es)
- es.each do |_time, record|
+ def process(_tag, events)
+ events.each do |_time, record|
log.debug "sakuraio: process record #{record}"
modules.each do |m|
s = encode_record(m, record)
log.debug "sakuraio: encoded json #{s}"
@client.send(s)
end
end
end
- def encode_record(m, record)
+ def encode_record(mod, record)
data = []
@channels.each do |ch, v|
key, type = v
data.push('channel' => ch.to_i,
'type' => type,
'value' => record[key])
end
hash = { 'type' => 'channels',
- 'module' => m,
+ 'module' => mod,
'payload' => { 'channels' => data } }
Yajl::Encoder.encode(hash)
end
end
end