lib/fluent/plugin/out_redis.rb in fluent-plugin-redis-0.1.3 vs lib/fluent/plugin/out_redis.rb in fluent-plugin-redis-0.2.0

- old
+ new

@@ -1,8 +1,9 @@ module Fluent class RedisOutput < BufferedOutput Fluent::Plugin.register_output('redis', self) + attr_reader :host, :port, :db_number, :redis def initialize super require 'redis' require 'msgpack' @@ -10,44 +11,39 @@ def configure(conf) super @host = conf.has_key?('host') ? conf['host'] : 'localhost' - @port = conf.has_key?('port') ? conf['port'] : 6379 - @db = conf.has_key?('db') ? conf['db'] : nil + @port = conf.has_key?('port') ? conf['port'].to_i : 6379 + @db_number = conf.has_key?('db_number') ? conf['db_number'].to_i : nil if conf.has_key?('namespace') - $log.warn "Namespace option has been removed from fluent-plugin-radis 0.1.1. Please add or remove the namespace '#{conf['namespace']}' manually." + $log.warn "namespace option has been removed from fluent-plugin-redis 0.1.3. Please add or remove the namespace '#{conf['namespace']}' manually." end end def start super @redis = Redis.new(:host => @host, :port => @port, - :thread_safe => true, :db => @db) + :thread_safe => true, :db => @db_number) end def shutdown @redis.quit end - def format(tag, event) - # event.record[:identifier]=[tag,event.time].join(".") - # event.record.to_msgpack - identifier=[tag,event.time].join(".") - [ identifier, event.record ].to_msgpack + def format(tag, time, record) + identifier = [tag, time].join(".") + [identifier, record].to_msgpack end def write(chunk) @redis.pipelined { chunk.open { |io| begin - MessagePack::Unpacker.new(io).each { |record| - # identifier = record["identifier"].to_s - # record.delete("identifier") - # @redis.mapped_hmset identifier, record - @redis.mapped_hmset record[0], record[1] + MessagePack::Unpacker.new(io).each.each_with_index { |record, index| + @redis.mapped_hmset "#{record[0]}.#{index}", record[1] } rescue EOFError # EOFError always occured when reached end of chunk. end }