lib/fluent/plugin/out_redis.rb in fluent-plugin-redis-0.3.1 vs lib/fluent/plugin/out_redis.rb in fluent-plugin-redis-0.3.2
- old
+ new
@@ -7,20 +7,22 @@
Fluent::Plugin.register_output('redis', self)
helpers :compat_parameters, :inject
DEFAULT_BUFFER_TYPE = "memory"
+ DEFAULT_TTL_VALUE = -1
attr_reader :redis
config_param :host, :string, default: 'localhost'
config_param :port, :integer, default: 6379
config_param :db_number, :integer, default: 0
config_param :password, :string, default: nil, secret: true
config_param :insert_key_prefix, :string, default: "${tag}"
config_param :strftime_format, :string, default: "%s"
config_param :allow_duplicate_key, :bool, default: false
+ config_param :ttl, :integer, default: DEFAULT_TTL_VALUE
config_section :buffer do
config_set_default :@type, DEFAULT_BUFFER_TYPE
config_set_default :chunk_keys, ['tag', 'time']
config_set_default :timekey, 60
@@ -71,14 +73,20 @@
@redis.pipelined {
unless @allow_duplicate_key
stream = chunk.to_msgpack_stream
@unpacker.feed_each(stream).with_index { |record, index|
identifier = [tag, time].join(".")
- @redis.mapped_hmset "#{identifier}.#{index}", record[2]
+ @redis.multi do
+ @redis.mapped_hmset "#{identifier}.#{index}", record[2]
+ @redis.expire "#{identifier}.#{index}", @ttl if @ttl > 0
+ end
}
else
chunk.each do |_tag, _time, record|
- @redis.mapped_hmset "#{tag}", record
+ @redis.multi do
+ @redis.mapped_hmset "#{tag}", record
+ @redis.expire "#{tag}", @ttl if @ttl > 0
+ end
end
end
}
end