lib/fluent/plugin/out_redis.rb in fluent-plugin-redis-0.2.3 vs lib/fluent/plugin/out_redis.rb in fluent-plugin-redis-0.3.0
- old
+ new
@@ -1,42 +1,55 @@
-module Fluent
- class RedisOutput < BufferedOutput
+require 'redis'
+require 'msgpack'
+require 'fluent/msgpack_factory'
+require 'fluent/plugin/output'
+
+module Fluent::Plugin
+ class RedisOutput < Output
+ include Fluent::MessagePackFactory::Mixin
+
Fluent::Plugin.register_output('redis', self)
+
+ helpers :compat_parameters, :inject
+
+ DEFAULT_BUFFER_TYPE = "memory"
+
attr_reader :redis
- config_param :host, :string, :default => 'localhost'
- config_param :port, :integer, :default => 6379
- config_param :db_number, :integer, :default => 0
- config_param :password, :string, :default => nil, :secret => true
+ config_param :host, :string, default: 'localhost'
+ config_param :port, :integer, default: 6379
+ config_param :db_number, :integer, default: 0
+ config_param :password, :string, default: nil, secret: true
+ config_param :insert_key_prefix, :string, default: "${tag}"
+ config_param :strftime_format, :string, default: "%s"
+ config_param :allow_duplicate_key, :bool, default: false
- # To support log_level option implemented by Fluentd v0.10.43
- unless method_defined?(:log)
- define_method("log") { $log }
+ config_section :buffer do
+ config_set_default :@type, DEFAULT_BUFFER_TYPE
+ config_set_default :chunk_keys, ['tag', 'time']
+ config_set_default :timekey, 60
end
- def initialize
- super
- require 'redis'
- require 'msgpack'
- end
-
def configure(conf)
+ compat_parameters_convert(conf, :buffer, :inject)
super
if conf.has_key?('namespace')
log.warn "namespace option has been removed from fluent-plugin-redis 0.1.3. Please add or remove the namespace '#{conf['namespace']}' manually."
end
+ raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag
+ raise Fluent::ConfigError, "'time' in chunk_keys is required." if not @chunk_key_time
end
def start
super
options = {
- :host => @host,
- :port => @port,
- :thread_safe => true,
- :db => @db_number
+ host: @host,
+ port: @port,
+ thread_safe: true,
+ db: @db_number
}
options[:password] = @password if @password
@redis = Redis.new(options)
end
@@ -45,24 +58,44 @@
@redis.quit
super
end
def format(tag, time, record)
- identifier = [tag, time].join(".")
- [identifier, record].to_msgpack
+ record = inject_values_to_record(tag, time, record)
+ [tag, time, record].to_msgpack
end
+ def formatted_to_msgpack_binary
+ true
+ end
+
def write(chunk)
+ tag, time = expand_placeholders(chunk.metadata)
@redis.pipelined {
- chunk.open { |io|
- begin
- MessagePack::Unpacker.new(io).each.each_with_index { |record, index|
- @redis.mapped_hmset "#{record[0]}.#{index}", record[1]
- }
- rescue EOFError
- # EOFError always occured when reached end of chunk.
+ unless @allow_duplicate_key
+ chunk.open { |io|
+ begin
+ msgpack_unpacker(io).each.with_index { |record, index|
+ identifier = [tag, time].join(".")
+ @redis.mapped_hmset "#{identifier}.#{index}", record[2]
+ }
+ rescue EOFError
+ # EOFError always occured when reached end of chunk.
+ end
+ }
+ else
+ chunk.each do |_tag, _time, record|
+ @redis.mapped_hmset "#{tag}", record
end
- }
+ end
}
+ end
+
+ private
+
+ def expand_placeholders(metadata)
+ tag = extract_placeholders(@insert_key_prefix, metadata)
+ time = extract_placeholders(@strftime_format, metadata)
+ return tag, time
end
end
end