lib/fluent/plugin/out_sakuraio.rb in fluent-plugin-sakuraio-0.1.1 vs lib/fluent/plugin/out_sakuraio.rb in fluent-plugin-sakuraio-0.1.2
- old
+ new
@@ -12,11 +12,10 @@
config_param :channels, :hash
def configure(conf)
super
- @client = Faye::WebSocket::Client.new(@url)
ensure_reactor_running
thread_create(:out_sakuraio, &method(:run))
end
def ensure_reactor_running
@@ -25,15 +24,20 @@
EM.run
end
end
def run()
+ @client = Faye::WebSocket::Client.new(@url)
EM.next_tick do
@client.on :open do
log.info "sakuraio: starting websocket connection for #{@url}."
end
+ @client.on :message do |event|
+ log.debug "sakuraio: received message #{event.data}"
+ end
+
@client.on :error do |event|
log.warn "sakuraio: #{event.message}"
end
@client.on :close do
@@ -43,22 +47,27 @@
end
def process(_tag, es)
es.each do |_time, record|
log.debug "sakuraio: process record #{record}"
- @client.send(encode_record(record))
+ modules.each do |m|
+ s = encode_record(m, record)
+ log.debug "sakuraio: encoded json #{s}"
+ @client.send(s)
+ end
end
end
- def encode_record(record)
+ def encode_record(m, record)
data = []
@channels.each do |ch, v|
key, type = v
- data.push('channel' => ch,
+ data.push('channel' => ch.to_i,
'type' => type,
'value' => record[key])
end
hash = { 'type' => 'channels',
+ 'module' => m,
'payload' => { 'channels' => data } }
Yajl::Encoder.encode(hash)
end
end
end