lib/fluent/plugin/out_redis.rb in fluent-plugin-redis-0.3.1 vs lib/fluent/plugin/out_redis.rb in fluent-plugin-redis-0.3.2

- old
+ new

@@ -7,20 +7,22 @@ Fluent::Plugin.register_output('redis', self) helpers :compat_parameters, :inject DEFAULT_BUFFER_TYPE = "memory" + DEFAULT_TTL_VALUE = -1 attr_reader :redis config_param :host, :string, default: 'localhost' config_param :port, :integer, default: 6379 config_param :db_number, :integer, default: 0 config_param :password, :string, default: nil, secret: true config_param :insert_key_prefix, :string, default: "${tag}" config_param :strftime_format, :string, default: "%s" config_param :allow_duplicate_key, :bool, default: false + config_param :ttl, :integer, default: DEFAULT_TTL_VALUE config_section :buffer do config_set_default :@type, DEFAULT_BUFFER_TYPE config_set_default :chunk_keys, ['tag', 'time'] config_set_default :timekey, 60 @@ -71,14 +73,20 @@ @redis.pipelined { unless @allow_duplicate_key stream = chunk.to_msgpack_stream @unpacker.feed_each(stream).with_index { |record, index| identifier = [tag, time].join(".") - @redis.mapped_hmset "#{identifier}.#{index}", record[2] + @redis.multi do + @redis.mapped_hmset "#{identifier}.#{index}", record[2] + @redis.expire "#{identifier}.#{index}", @ttl if @ttl > 0 + end } else chunk.each do |_tag, _time, record| - @redis.mapped_hmset "#{tag}", record + @redis.multi do + @redis.mapped_hmset "#{tag}", record + @redis.expire "#{tag}", @ttl if @ttl > 0 + end end end } end