lib/fluent/plugin/out_redis.rb in fluent-plugin-redis-0.3.0 vs lib/fluent/plugin/out_redis.rb in fluent-plugin-redis-0.3.1

- old
+ new

@@ -1,14 +1,11 @@ require 'redis' require 'msgpack' -require 'fluent/msgpack_factory' require 'fluent/plugin/output' module Fluent::Plugin class RedisOutput < Output - include Fluent::MessagePackFactory::Mixin - Fluent::Plugin.register_output('redis', self) helpers :compat_parameters, :inject DEFAULT_BUFFER_TYPE = "memory" @@ -36,10 +33,11 @@ if conf.has_key?('namespace') log.warn "namespace option has been removed from fluent-plugin-redis 0.1.3. Please add or remove the namespace '#{conf['namespace']}' manually." end raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag raise Fluent::ConfigError, "'time' in chunk_keys is required." if not @chunk_key_time + @unpacker = Fluent::Engine.msgpack_factory.unpacker end def start super @@ -70,18 +68,13 @@ def write(chunk) tag, time = expand_placeholders(chunk.metadata) @redis.pipelined { unless @allow_duplicate_key - chunk.open { |io| - begin - msgpack_unpacker(io).each.with_index { |record, index| - identifier = [tag, time].join(".") - @redis.mapped_hmset "#{identifier}.#{index}", record[2] - } - rescue EOFError - # EOFError always occured when reached end of chunk. - end + stream = chunk.to_msgpack_stream + @unpacker.feed_each(stream).with_index { |record, index| + identifier = [tag, time].join(".") + @redis.mapped_hmset "#{identifier}.#{index}", record[2] } else chunk.each do |_tag, _time, record| @redis.mapped_hmset "#{tag}", record end