lib/fluent/plugin/out_redis.rb in fluent-plugin-redis-0.1.3 vs lib/fluent/plugin/out_redis.rb in fluent-plugin-redis-0.2.0
- old
+ new
@@ -1,8 +1,9 @@
module Fluent
class RedisOutput < BufferedOutput
Fluent::Plugin.register_output('redis', self)
+ attr_reader :host, :port, :db_number, :redis
def initialize
super
require 'redis'
require 'msgpack'
@@ -10,44 +11,39 @@
def configure(conf)
super
@host = conf.has_key?('host') ? conf['host'] : 'localhost'
- @port = conf.has_key?('port') ? conf['port'] : 6379
- @db = conf.has_key?('db') ? conf['db'] : nil
+ @port = conf.has_key?('port') ? conf['port'].to_i : 6379
+ @db_number = conf.has_key?('db_number') ? conf['db_number'].to_i : nil
if conf.has_key?('namespace')
- $log.warn "Namespace option has been removed from fluent-plugin-radis 0.1.1. Please add or remove the namespace '#{conf['namespace']}' manually."
+ $log.warn "namespace option has been removed from fluent-plugin-redis 0.1.3. Please add or remove the namespace '#{conf['namespace']}' manually."
end
end
def start
super
@redis = Redis.new(:host => @host, :port => @port,
- :thread_safe => true, :db => @db)
+ :thread_safe => true, :db => @db_number)
end
def shutdown
@redis.quit
end
- def format(tag, event)
- # event.record[:identifier]=[tag,event.time].join(".")
- # event.record.to_msgpack
- identifier=[tag,event.time].join(".")
- [ identifier, event.record ].to_msgpack
+ def format(tag, time, record)
+ identifier = [tag, time].join(".")
+ [identifier, record].to_msgpack
end
def write(chunk)
@redis.pipelined {
chunk.open { |io|
begin
- MessagePack::Unpacker.new(io).each { |record|
- # identifier = record["identifier"].to_s
- # record.delete("identifier")
- # @redis.mapped_hmset identifier, record
- @redis.mapped_hmset record[0], record[1]
+ MessagePack::Unpacker.new(io).each.each_with_index { |record, index|
+ @redis.mapped_hmset "#{record[0]}.#{index}", record[1]
}
rescue EOFError
# EOFError always occured when reached end of chunk.
end
}