lib/fluent/plugin/out_redis.rb in fluent-plugin-redis-0.3.0 vs lib/fluent/plugin/out_redis.rb in fluent-plugin-redis-0.3.1
- old
+ new
@@ -1,14 +1,11 @@
require 'redis'
require 'msgpack'
-require 'fluent/msgpack_factory'
require 'fluent/plugin/output'
module Fluent::Plugin
class RedisOutput < Output
- include Fluent::MessagePackFactory::Mixin
-
Fluent::Plugin.register_output('redis', self)
helpers :compat_parameters, :inject
DEFAULT_BUFFER_TYPE = "memory"
@@ -36,10 +33,11 @@
if conf.has_key?('namespace')
log.warn "namespace option has been removed from fluent-plugin-redis 0.1.3. Please add or remove the namespace '#{conf['namespace']}' manually."
end
raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag
raise Fluent::ConfigError, "'time' in chunk_keys is required." if not @chunk_key_time
+ @unpacker = Fluent::Engine.msgpack_factory.unpacker
end
def start
super
@@ -70,18 +68,13 @@
def write(chunk)
tag, time = expand_placeholders(chunk.metadata)
@redis.pipelined {
unless @allow_duplicate_key
- chunk.open { |io|
- begin
- msgpack_unpacker(io).each.with_index { |record, index|
- identifier = [tag, time].join(".")
- @redis.mapped_hmset "#{identifier}.#{index}", record[2]
- }
- rescue EOFError
- # EOFError always occured when reached end of chunk.
- end
+ stream = chunk.to_msgpack_stream
+ @unpacker.feed_each(stream).with_index { |record, index|
+ identifier = [tag, time].join(".")
+ @redis.mapped_hmset "#{identifier}.#{index}", record[2]
}
else
chunk.each do |_tag, _time, record|
@redis.mapped_hmset "#{tag}", record
end