lib/fluent/plugin/out_yohoushi.rb in fluent-plugin-yohoushi-0.0.5 vs lib/fluent/plugin/out_yohoushi.rb in fluent-plugin-yohoushi-0.1.0
- old
+ new
@@ -1,228 +1,240 @@
-class Fluent::YohoushiOutput < Fluent::Output
- Fluent::Plugin.register_output('yohoushi', self)
+module Fluent
+ class YohoushiOutput < BufferedOutput
+ Plugin.register_output('yohoushi', self)
- MAPPING_MAX_NUM = 20
- KEY_MAX_NUM = 20
+ MAPPING_MAX_NUM = 20
+ KEY_MAX_NUM = 20
- def initialize
- super
- require 'socket'
- require 'multiforecast-client'
- require 'yohoushi-client'
- end
+ def initialize
+ super
+ require 'socket'
+ require 'multiforecast-client'
+ require 'yohoushi-client'
+ end
- config_param :base_uri, :string, :default => nil
- (1..MAPPING_MAX_NUM).each {|i| config_param "mapping#{i}".to_sym, :string, :default => nil }
- config_param :key_pattern, :string, :default => nil
- (1..KEY_MAX_NUM).each {|i| config_param "key#{i}".to_sym, :string, :default => nil }
- config_param :enable_float_number, :bool, :default => false
- config_param :mode, :default => :gauge do |val|
- case val.downcase
- when 'gauge'
- :gauge
- when 'count'
- :count
- when 'modified'
- :modified
- when 'derive'
- :derive
- else
- raise Fluent::ConfigError, "stdout output output_type should be `gauge`, `count`, `modified`, or `derive`"
+ config_param :base_uri, :string, :default => nil
+ (1..MAPPING_MAX_NUM).each {|i| config_param "mapping#{i}".to_sym, :string, :default => nil }
+ config_param :key_pattern, :string, :default => nil
+ (1..KEY_MAX_NUM).each {|i| config_param "key#{i}".to_sym, :string, :default => nil }
+ config_param :enable_float_number, :bool, :default => false
+ config_param :mode, :default => :gauge do |val|
+ case val.downcase
+ when 'gauge'
+ :gauge
+ when 'count'
+ :count
+ when 'modified'
+ :modified
+ when 'derive'
+ :derive
+ else
+ raise ConfigError, "stdout output output_type should be `gauge`, `count`, `modified`, or `derive`"
+ end
end
- end
- config_param :enable_ruby, :bool, :default => true # true for lower version compatibility
+ config_param :enable_ruby, :bool, :default => true # true for lower version compatibility
- # for test
- attr_reader :client
- attr_reader :mapping
- attr_reader :keys
- attr_reader :key_pattern
- attr_reader :key_pattern_path
+ # Override default parameters of Bufferedoutput options
+ config_param :buffer_type, :string, :default => 'memory'
+ config_param :flush_interval, :time, :default => 0 # we can not wait 1 minute to create 1 minute graphs (originally, 60)
+ config_param :try_flush_interval, :float, :default => 1 # we would be able to shorten more
+ config_param :retry_limit, :integer, :default => 1 # growthforecast requires a realtime post, so retry only once (originally, 17)
+ config_param :retry_wait, :time, :default => 1.0
+ config_param :max_retry_wait, :time, :default => nil
+ config_param :num_threads, :integer, :default => 1
+ config_param :queued_chunk_flush_interval, :time, :default => 1
- def configure(conf)
- super
+ # for test
+ attr_reader :client
+ attr_reader :mapping
+ attr_reader :keys
+ attr_reader :key_pattern
+ attr_reader :key_pattern_path
- if @base_uri
- @client = Yohoushi::Client.new(@base_uri)
- else
- @mapping = {}
- (1..MAPPING_MAX_NUM).each do |i|
- next unless conf["mapping#{i}"]
- from, to = conf["mapping#{i}"].split(/ +/, 2)
- raise Fluent::ConfigError, "mapping#{i} does not contain 2 parameters" unless to
- @mapping[from] = to
- end
- @client = MultiForecast::Client.new('mapping' => @mapping) unless @mapping.empty?
- end
- raise Fluent::ConfigError, "Either of `base_uri` or `mapping1` must be specified" unless @client
+ def configure(conf)
+ super
- if @key_pattern
- key_pattern, @key_pattern_path = @key_pattern.split(/ +/, 2)
- raise Fluent::ConfigError, "key_pattern does not contain 2 parameters" unless @key_pattern_path
- @key_pattern = Regexp.compile(key_pattern)
- else
- @keys = {}
- (1..KEY_MAX_NUM).each do |i|
- next unless conf["key#{i}"]
- key, path = conf["key#{i}"].split(/ +/, 2)
- raise Fluent::ConfigError, "key#{i} does not contain 2 parameters" unless path
- @keys[key] = path
+ if @base_uri
+ @client = Yohoushi::Client.new(@base_uri)
+ else
+ @mapping = {}
+ (1..MAPPING_MAX_NUM).each do |i|
+ next unless conf["mapping#{i}"]
+ from, to = conf["mapping#{i}"].split(/ +/, 2)
+ raise ConfigError, "mapping#{i} does not contain 2 parameters" unless to
+ @mapping[from] = to
+ end
+ @client = MultiForecast::Client.new('mapping' => @mapping) unless @mapping.empty?
end
- end
- raise Fluent::ConfigError, "Either of `key_pattern` or `key1` must be specified" if (@key_pattern.nil? and @keys.empty?)
+ raise ConfigError, "Either of `base_uri` or `mapping1` must be specified" unless @client
- @placeholder_expander =
- if @enable_ruby
- # require utilities which would be used in ruby placeholders
- require 'pathname'
- require 'uri'
- require 'cgi'
- RubyPlaceholderExpander.new
+ if @key_pattern
+ key_pattern, @key_pattern_path = @key_pattern.split(/ +/, 2)
+ raise ConfigError, "key_pattern does not contain 2 parameters" unless @key_pattern_path
+ @key_pattern = Regexp.compile(key_pattern)
else
- PlaceholderExpander.new
+ @keys = {}
+ (1..KEY_MAX_NUM).each do |i|
+ next unless conf["key#{i}"]
+ key, path = conf["key#{i}"].split(/ +/, 2)
+ raise ConfigError, "key#{i} does not contain 2 parameters" unless path
+ @keys[key] = path
+ end
end
+ raise ConfigError, "Either of `key_pattern` or `key1` must be specified" if (@key_pattern.nil? and @keys.empty?)
- @hostname = Socket.gethostname
- rescue => e
- raise Fluent::ConfigError, "#{e.class} #{e.message} #{e.backtrace.first}"
- end
+ @placeholder_expander =
+ if @enable_ruby
+ # require utilities which would be used in ruby placeholders
+ require 'pathname'
+ require 'uri'
+ require 'cgi'
+ RubyPlaceholderExpander.new
+ else
+ PlaceholderExpander.new
+ end
- def start
- super
- end
+ @hostname = Socket.gethostname
+ rescue => e
+ raise ConfigError, "#{e.class} #{e.message} #{e.backtrace.first}"
+ end
- def shutdown
- super
- end
+ def start
+ super
+ end
- def post(path, number)
- if @enable_float_number
- @client.post_graph(path, { 'number' => number.to_f, 'mode' => @mode.to_s })
- else
- @client.post_graph(path, { 'number' => number.to_i, 'mode' => @mode.to_s })
+ def shutdown
+ super
end
- rescue => e
- $log.warn "out_yohoushi: #{e.class} #{e.message} #{e.backtrace.first}"
- end
- def emit(tag, es, chain)
- tag_parts = tag.split('.')
- tag_prefix = tag_prefix(tag_parts)
- tag_suffix = tag_suffix(tag_parts)
- placeholders = {
- 'tag' => tag,
- 'tags' => tag_parts, # for lower compatibility
- 'tag_parts' => tag_parts,
- 'tag_prefix' => tag_prefix,
- 'tag_suffix' => tag_suffix,
- 'hostname' => @hostname,
- }
- if @key_pattern
- es.each do |time, record|
- record.each do |key, value|
- next unless key =~ @key_pattern
- placeholders['key'] = key
- path = expand_placeholder(@key_pattern_path, time, record, placeholders)
- post(path, value)
- end
+ def post(path, number)
+ if @enable_float_number
+ @client.post_graph(path, { 'number' => number.to_f, 'mode' => @mode.to_s })
+ else
+ @client.post_graph(path, { 'number' => number.to_i, 'mode' => @mode.to_s })
end
- else # keys
- es.each do |time, record|
- @keys.each do |key, path|
- next unless value = record[key]
- placeholders['key'] = key
- path = expand_placeholder(path, time, record, placeholders)
- post(path, value)
+ rescue => e
+ $log.warn "out_yohoushi: #{e.class} #{e.message} #{e.backtrace.first}"
+ end
+
+ def format(tag, time, record)
+ [tag, time, record].to_msgpack
+ end
+
+ def write(chunk)
+ chunk.msgpack_each do |tag, time, record|
+ tag_parts = tag.split('.')
+ tag_prefix = tag_prefix(tag_parts)
+ tag_suffix = tag_suffix(tag_parts)
+ placeholders = {
+ 'tag' => tag,
+ 'tags' => tag_parts, # for lower compatibility
+ 'tag_parts' => tag_parts,
+ 'tag_prefix' => tag_prefix,
+ 'tag_suffix' => tag_suffix,
+ 'hostname' => @hostname,
+ }
+ if @key_pattern
+ record.each do |key, value|
+ next unless key =~ @key_pattern
+ placeholders['key'] = key
+ path = expand_placeholder(@key_pattern_path, time, record, placeholders)
+ post(path, value)
+ end
+ else # keys
+ @keys.each do |key, path|
+ next unless value = record[key]
+ placeholders['key'] = key
+ path = expand_placeholder(path, time, record, placeholders)
+ post(path, value)
+ end
end
end
+ rescue => e
+ $log.warn "out_yohoushi: #{e.class} #{e.message} #{e.backtrace.first}"
end
- chain.next
- rescue => e
- $log.warn "out_yohoushi: #{e.class} #{e.message} #{e.backtrace.first}"
- end
+ def expand_placeholder(value, time, record, opts)
+ @placeholder_expander.prepare_placeholders(time, record, opts)
+ @placeholder_expander.expand(value)
+ end
- def expand_placeholder(value, time, record, opts)
- @placeholder_expander.prepare_placeholders(time, record, opts)
- @placeholder_expander.expand(value)
- end
-
- def tag_prefix(tag_parts)
- return [] if tag_parts.empty?
- tag_prefix = [tag_parts.first]
- 1.upto(tag_parts.size-1).each do |i|
- tag_prefix[i] = "#{tag_prefix[i-1]}.#{tag_parts[i]}"
+ def tag_prefix(tag_parts)
+ return [] if tag_parts.empty?
+ tag_prefix = [tag_parts.first]
+ 1.upto(tag_parts.size-1).each do |i|
+ tag_prefix[i] = "#{tag_prefix[i-1]}.#{tag_parts[i]}"
+ end
+ tag_prefix
end
- tag_prefix
- end
- def tag_suffix(tag_parts)
- return [] if tag_parts.empty?
- rev_tag_parts = tag_parts.reverse
- rev_tag_suffix = [rev_tag_parts.first]
- 1.upto(tag_parts.size-1).each do |i|
- rev_tag_suffix[i] = "#{rev_tag_parts[i]}.#{rev_tag_suffix[i-1]}"
+ def tag_suffix(tag_parts)
+ return [] if tag_parts.empty?
+ rev_tag_parts = tag_parts.reverse
+ rev_tag_suffix = [rev_tag_parts.first]
+ 1.upto(tag_parts.size-1).each do |i|
+ rev_tag_suffix[i] = "#{rev_tag_parts[i]}.#{rev_tag_suffix[i-1]}"
+ end
+ rev_tag_suffix.reverse
end
- rev_tag_suffix.reverse
- end
- class PlaceholderExpander
- attr_reader :placeholders
+ class PlaceholderExpander
+ attr_reader :placeholders
- def prepare_placeholders(time, record, opts)
- placeholders = { '${time}' => Time.at(time).to_s }
- record.each {|key, val| placeholders.store("${#{key}}", val) }
+ def prepare_placeholders(time, record, opts)
+ placeholders = { '${time}' => Time.at(time).to_s }
+ record.each {|key, value| placeholders.store("${#{key}}", value) }
- opts.each do |key, val|
- if val.kind_of?(Array)
- size = val.size
- val.each_with_index { |t, idx|
- placeholders.store("${#{key}[#{idx}]}", t)
- placeholders.store("${#{key}[#{idx-size}]}", t) # support [-1]
- }
- else # string, interger, float, and others?
- placeholders["${#{key}}"] = val
+ opts.each do |key, value|
+ if value.kind_of?(Array) # tag_parts, etc
+ size = value.size
+ value.each_with_index { |v, idx|
+ placeholders.store("${#{key}[#{idx}]}", v)
+ placeholders.store("${#{key}[#{idx-size}]}", v) # support [-1]
+ }
+ else # string, interger, float, and others?
+ placeholders.store("${#{key}}", value)
+ end
end
+
+ @placeholders = placeholders
end
- @placeholders = placeholders
+ def expand(str)
+ str.gsub(/(\${[a-z_]+(\[-?[0-9]+\])?}|__[A-Z_]+__)/) {
+ $log.warn "record_reformer: unknown placeholder `#{$1}` found" unless @placeholders.include?($1)
+ @placeholders[$1]
+ }
+ end
end
- def expand(str)
- str.gsub(/(\${[a-z_]+(\[-?[0-9]+\])?}|__[A-Z_]+__)/) {
- $log.warn "record_reformer: unknown placeholder `#{$1}` found" unless @placeholders.include?($1)
- @placeholders[$1]
- }
- end
- end
+ class RubyPlaceholderExpander
+ attr_reader :placeholders
- class RubyPlaceholderExpander
- attr_reader :placeholders
+ # Get placeholders as a struct
+ #
+ # @param [Time] time the time
+ # @param [Hash] record the record
+ # @param [Hash] opts others
+ def prepare_placeholders(time, record, opts)
+ struct = UndefOpenStruct.new(record)
+ struct.time = Time.at(time)
+ opts.each {|key, value| struct.__send__("#{key}=", value) }
+ @placeholders = struct
+ end
- # Get placeholders as a struct
- #
- # @param [Time] time the time
- # @param [Hash] record the record
- # @param [Hash] opts others
- def prepare_placeholders(time, record, opts)
- struct = UndefOpenStruct.new(record)
- struct.time = Time.at(time)
- opts.each {|key, val| struct.__send__("#{key}=", val) }
- @placeholders = struct
- end
+ # Replace placeholders in a string
+ #
+ # @param [String] str the string to be replaced
+ def expand(str)
+ str = str.gsub(/\$\{([^}]+)\}/, '#{\1}') # ${..} => #{..}
+ eval "\"#{str}\"", @placeholders.instance_eval { binding }
+ end
- # Replace placeholders in a string
- #
- # @param [String] str the string to be replaced
- def expand(str)
- str = str.gsub(/\$\{([^}]+)\}/, '#{\1}') # ${..} => #{..}
- eval "\"#{str}\"", @placeholders.instance_eval { binding }
- end
-
- class UndefOpenStruct < OpenStruct
- (Object.instance_methods).each do |m|
- undef_method m unless m.to_s =~ /^__|respond_to_missing\?|object_id|public_methods|instance_eval|method_missing|define_singleton_method|respond_to\?|new_ostruct_member/
+ class UndefOpenStruct < OpenStruct
+ (Object.instance_methods).each do |m|
+ undef_method m unless m.to_s =~ /^__|respond_to_missing\?|object_id|public_methods|instance_eval|method_missing|define_singleton_method|respond_to\?|new_ostruct_member/
+ end
end
end
end
end