lib/fluent/plugin/out_redis.rb in fluent-plugin-redis-0.2.3 vs lib/fluent/plugin/out_redis.rb in fluent-plugin-redis-0.3.0

- old
+ new

@@ -1,42 +1,55 @@ -module Fluent - class RedisOutput < BufferedOutput +require 'redis' +require 'msgpack' +require 'fluent/msgpack_factory' +require 'fluent/plugin/output' + +module Fluent::Plugin + class RedisOutput < Output + include Fluent::MessagePackFactory::Mixin + Fluent::Plugin.register_output('redis', self) + + helpers :compat_parameters, :inject + + DEFAULT_BUFFER_TYPE = "memory" + attr_reader :redis - config_param :host, :string, :default => 'localhost' - config_param :port, :integer, :default => 6379 - config_param :db_number, :integer, :default => 0 - config_param :password, :string, :default => nil, :secret => true + config_param :host, :string, default: 'localhost' + config_param :port, :integer, default: 6379 + config_param :db_number, :integer, default: 0 + config_param :password, :string, default: nil, secret: true + config_param :insert_key_prefix, :string, default: "${tag}" + config_param :strftime_format, :string, default: "%s" + config_param :allow_duplicate_key, :bool, default: false - # To support log_level option implemented by Fluentd v0.10.43 - unless method_defined?(:log) - define_method("log") { $log } + config_section :buffer do + config_set_default :@type, DEFAULT_BUFFER_TYPE + config_set_default :chunk_keys, ['tag', 'time'] + config_set_default :timekey, 60 end - def initialize - super - require 'redis' - require 'msgpack' - end - def configure(conf) + compat_parameters_convert(conf, :buffer, :inject) super if conf.has_key?('namespace') log.warn "namespace option has been removed from fluent-plugin-redis 0.1.3. Please add or remove the namespace '#{conf['namespace']}' manually." end + raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag + raise Fluent::ConfigError, "'time' in chunk_keys is required." if not @chunk_key_time end def start super options = { - :host => @host, - :port => @port, - :thread_safe => true, - :db => @db_number + host: @host, + port: @port, + thread_safe: true, + db: @db_number } options[:password] = @password if @password @redis = Redis.new(options) end @@ -45,24 +58,44 @@ @redis.quit super end def format(tag, time, record) - identifier = [tag, time].join(".") - [identifier, record].to_msgpack + record = inject_values_to_record(tag, time, record) + [tag, time, record].to_msgpack end + def formatted_to_msgpack_binary + true + end + def write(chunk) + tag, time = expand_placeholders(chunk.metadata) @redis.pipelined { - chunk.open { |io| - begin - MessagePack::Unpacker.new(io).each.each_with_index { |record, index| - @redis.mapped_hmset "#{record[0]}.#{index}", record[1] - } - rescue EOFError - # EOFError always occured when reached end of chunk. + unless @allow_duplicate_key + chunk.open { |io| + begin + msgpack_unpacker(io).each.with_index { |record, index| + identifier = [tag, time].join(".") + @redis.mapped_hmset "#{identifier}.#{index}", record[2] + } + rescue EOFError + # EOFError always occured when reached end of chunk. + end + } + else + chunk.each do |_tag, _time, record| + @redis.mapped_hmset "#{tag}", record end - } + end } + end + + private + + def expand_placeholders(metadata) + tag = extract_placeholders(@insert_key_prefix, metadata) + time = extract_placeholders(@strftime_format, metadata) + return tag, time end end end