lib/fluent/plugin/out_yohoushi.rb in fluent-plugin-yohoushi-0.0.5 vs lib/fluent/plugin/out_yohoushi.rb in fluent-plugin-yohoushi-0.1.0

- old
+ new

@@ -1,228 +1,240 @@ -class Fluent::YohoushiOutput < Fluent::Output - Fluent::Plugin.register_output('yohoushi', self) +module Fluent + class YohoushiOutput < BufferedOutput + Plugin.register_output('yohoushi', self) - MAPPING_MAX_NUM = 20 - KEY_MAX_NUM = 20 + MAPPING_MAX_NUM = 20 + KEY_MAX_NUM = 20 - def initialize - super - require 'socket' - require 'multiforecast-client' - require 'yohoushi-client' - end + def initialize + super + require 'socket' + require 'multiforecast-client' + require 'yohoushi-client' + end - config_param :base_uri, :string, :default => nil - (1..MAPPING_MAX_NUM).each {|i| config_param "mapping#{i}".to_sym, :string, :default => nil } - config_param :key_pattern, :string, :default => nil - (1..KEY_MAX_NUM).each {|i| config_param "key#{i}".to_sym, :string, :default => nil } - config_param :enable_float_number, :bool, :default => false - config_param :mode, :default => :gauge do |val| - case val.downcase - when 'gauge' - :gauge - when 'count' - :count - when 'modified' - :modified - when 'derive' - :derive - else - raise Fluent::ConfigError, "stdout output output_type should be `gauge`, `count`, `modified`, or `derive`" + config_param :base_uri, :string, :default => nil + (1..MAPPING_MAX_NUM).each {|i| config_param "mapping#{i}".to_sym, :string, :default => nil } + config_param :key_pattern, :string, :default => nil + (1..KEY_MAX_NUM).each {|i| config_param "key#{i}".to_sym, :string, :default => nil } + config_param :enable_float_number, :bool, :default => false + config_param :mode, :default => :gauge do |val| + case val.downcase + when 'gauge' + :gauge + when 'count' + :count + when 'modified' + :modified + when 'derive' + :derive + else + raise ConfigError, "stdout output output_type should be `gauge`, `count`, `modified`, or `derive`" + end end - end - config_param :enable_ruby, :bool, :default => true # true for lower version compatibility + config_param :enable_ruby, :bool, :default => true # true for lower version compatibility - # for test - attr_reader :client - attr_reader :mapping - attr_reader :keys - attr_reader :key_pattern - attr_reader :key_pattern_path + # Override default parameters of Bufferedoutput options + config_param :buffer_type, :string, :default => 'memory' + config_param :flush_interval, :time, :default => 0 # we can not wait 1 minute to create 1 minute graphs (originally, 60) + config_param :try_flush_interval, :float, :default => 1 # we would be able to shorten more + config_param :retry_limit, :integer, :default => 1 # growthforecast requires a realtime post, so retry only once (originally, 17) + config_param :retry_wait, :time, :default => 1.0 + config_param :max_retry_wait, :time, :default => nil + config_param :num_threads, :integer, :default => 1 + config_param :queued_chunk_flush_interval, :time, :default => 1 - def configure(conf) - super + # for test + attr_reader :client + attr_reader :mapping + attr_reader :keys + attr_reader :key_pattern + attr_reader :key_pattern_path - if @base_uri - @client = Yohoushi::Client.new(@base_uri) - else - @mapping = {} - (1..MAPPING_MAX_NUM).each do |i| - next unless conf["mapping#{i}"] - from, to = conf["mapping#{i}"].split(/ +/, 2) - raise Fluent::ConfigError, "mapping#{i} does not contain 2 parameters" unless to - @mapping[from] = to - end - @client = MultiForecast::Client.new('mapping' => @mapping) unless @mapping.empty? - end - raise Fluent::ConfigError, "Either of `base_uri` or `mapping1` must be specified" unless @client + def configure(conf) + super - if @key_pattern - key_pattern, @key_pattern_path = @key_pattern.split(/ +/, 2) - raise Fluent::ConfigError, "key_pattern does not contain 2 parameters" unless @key_pattern_path - @key_pattern = Regexp.compile(key_pattern) - else - @keys = {} - (1..KEY_MAX_NUM).each do |i| - next unless conf["key#{i}"] - key, path = conf["key#{i}"].split(/ +/, 2) - raise Fluent::ConfigError, "key#{i} does not contain 2 parameters" unless path - @keys[key] = path + if @base_uri + @client = Yohoushi::Client.new(@base_uri) + else + @mapping = {} + (1..MAPPING_MAX_NUM).each do |i| + next unless conf["mapping#{i}"] + from, to = conf["mapping#{i}"].split(/ +/, 2) + raise ConfigError, "mapping#{i} does not contain 2 parameters" unless to + @mapping[from] = to + end + @client = MultiForecast::Client.new('mapping' => @mapping) unless @mapping.empty? end - end - raise Fluent::ConfigError, "Either of `key_pattern` or `key1` must be specified" if (@key_pattern.nil? and @keys.empty?) + raise ConfigError, "Either of `base_uri` or `mapping1` must be specified" unless @client - @placeholder_expander = - if @enable_ruby - # require utilities which would be used in ruby placeholders - require 'pathname' - require 'uri' - require 'cgi' - RubyPlaceholderExpander.new + if @key_pattern + key_pattern, @key_pattern_path = @key_pattern.split(/ +/, 2) + raise ConfigError, "key_pattern does not contain 2 parameters" unless @key_pattern_path + @key_pattern = Regexp.compile(key_pattern) else - PlaceholderExpander.new + @keys = {} + (1..KEY_MAX_NUM).each do |i| + next unless conf["key#{i}"] + key, path = conf["key#{i}"].split(/ +/, 2) + raise ConfigError, "key#{i} does not contain 2 parameters" unless path + @keys[key] = path + end end + raise ConfigError, "Either of `key_pattern` or `key1` must be specified" if (@key_pattern.nil? and @keys.empty?) - @hostname = Socket.gethostname - rescue => e - raise Fluent::ConfigError, "#{e.class} #{e.message} #{e.backtrace.first}" - end + @placeholder_expander = + if @enable_ruby + # require utilities which would be used in ruby placeholders + require 'pathname' + require 'uri' + require 'cgi' + RubyPlaceholderExpander.new + else + PlaceholderExpander.new + end - def start - super - end + @hostname = Socket.gethostname + rescue => e + raise ConfigError, "#{e.class} #{e.message} #{e.backtrace.first}" + end - def shutdown - super - end + def start + super + end - def post(path, number) - if @enable_float_number - @client.post_graph(path, { 'number' => number.to_f, 'mode' => @mode.to_s }) - else - @client.post_graph(path, { 'number' => number.to_i, 'mode' => @mode.to_s }) + def shutdown + super end - rescue => e - $log.warn "out_yohoushi: #{e.class} #{e.message} #{e.backtrace.first}" - end - def emit(tag, es, chain) - tag_parts = tag.split('.') - tag_prefix = tag_prefix(tag_parts) - tag_suffix = tag_suffix(tag_parts) - placeholders = { - 'tag' => tag, - 'tags' => tag_parts, # for lower compatibility - 'tag_parts' => tag_parts, - 'tag_prefix' => tag_prefix, - 'tag_suffix' => tag_suffix, - 'hostname' => @hostname, - } - if @key_pattern - es.each do |time, record| - record.each do |key, value| - next unless key =~ @key_pattern - placeholders['key'] = key - path = expand_placeholder(@key_pattern_path, time, record, placeholders) - post(path, value) - end + def post(path, number) + if @enable_float_number + @client.post_graph(path, { 'number' => number.to_f, 'mode' => @mode.to_s }) + else + @client.post_graph(path, { 'number' => number.to_i, 'mode' => @mode.to_s }) end - else # keys - es.each do |time, record| - @keys.each do |key, path| - next unless value = record[key] - placeholders['key'] = key - path = expand_placeholder(path, time, record, placeholders) - post(path, value) + rescue => e + $log.warn "out_yohoushi: #{e.class} #{e.message} #{e.backtrace.first}" + end + + def format(tag, time, record) + [tag, time, record].to_msgpack + end + + def write(chunk) + chunk.msgpack_each do |tag, time, record| + tag_parts = tag.split('.') + tag_prefix = tag_prefix(tag_parts) + tag_suffix = tag_suffix(tag_parts) + placeholders = { + 'tag' => tag, + 'tags' => tag_parts, # for lower compatibility + 'tag_parts' => tag_parts, + 'tag_prefix' => tag_prefix, + 'tag_suffix' => tag_suffix, + 'hostname' => @hostname, + } + if @key_pattern + record.each do |key, value| + next unless key =~ @key_pattern + placeholders['key'] = key + path = expand_placeholder(@key_pattern_path, time, record, placeholders) + post(path, value) + end + else # keys + @keys.each do |key, path| + next unless value = record[key] + placeholders['key'] = key + path = expand_placeholder(path, time, record, placeholders) + post(path, value) + end end end + rescue => e + $log.warn "out_yohoushi: #{e.class} #{e.message} #{e.backtrace.first}" end - chain.next - rescue => e - $log.warn "out_yohoushi: #{e.class} #{e.message} #{e.backtrace.first}" - end + def expand_placeholder(value, time, record, opts) + @placeholder_expander.prepare_placeholders(time, record, opts) + @placeholder_expander.expand(value) + end - def expand_placeholder(value, time, record, opts) - @placeholder_expander.prepare_placeholders(time, record, opts) - @placeholder_expander.expand(value) - end - - def tag_prefix(tag_parts) - return [] if tag_parts.empty? - tag_prefix = [tag_parts.first] - 1.upto(tag_parts.size-1).each do |i| - tag_prefix[i] = "#{tag_prefix[i-1]}.#{tag_parts[i]}" + def tag_prefix(tag_parts) + return [] if tag_parts.empty? + tag_prefix = [tag_parts.first] + 1.upto(tag_parts.size-1).each do |i| + tag_prefix[i] = "#{tag_prefix[i-1]}.#{tag_parts[i]}" + end + tag_prefix end - tag_prefix - end - def tag_suffix(tag_parts) - return [] if tag_parts.empty? - rev_tag_parts = tag_parts.reverse - rev_tag_suffix = [rev_tag_parts.first] - 1.upto(tag_parts.size-1).each do |i| - rev_tag_suffix[i] = "#{rev_tag_parts[i]}.#{rev_tag_suffix[i-1]}" + def tag_suffix(tag_parts) + return [] if tag_parts.empty? + rev_tag_parts = tag_parts.reverse + rev_tag_suffix = [rev_tag_parts.first] + 1.upto(tag_parts.size-1).each do |i| + rev_tag_suffix[i] = "#{rev_tag_parts[i]}.#{rev_tag_suffix[i-1]}" + end + rev_tag_suffix.reverse end - rev_tag_suffix.reverse - end - class PlaceholderExpander - attr_reader :placeholders + class PlaceholderExpander + attr_reader :placeholders - def prepare_placeholders(time, record, opts) - placeholders = { '${time}' => Time.at(time).to_s } - record.each {|key, val| placeholders.store("${#{key}}", val) } + def prepare_placeholders(time, record, opts) + placeholders = { '${time}' => Time.at(time).to_s } + record.each {|key, value| placeholders.store("${#{key}}", value) } - opts.each do |key, val| - if val.kind_of?(Array) - size = val.size - val.each_with_index { |t, idx| - placeholders.store("${#{key}[#{idx}]}", t) - placeholders.store("${#{key}[#{idx-size}]}", t) # support [-1] - } - else # string, interger, float, and others? - placeholders["${#{key}}"] = val + opts.each do |key, value| + if value.kind_of?(Array) # tag_parts, etc + size = value.size + value.each_with_index { |v, idx| + placeholders.store("${#{key}[#{idx}]}", v) + placeholders.store("${#{key}[#{idx-size}]}", v) # support [-1] + } + else # string, interger, float, and others? + placeholders.store("${#{key}}", value) + end end + + @placeholders = placeholders end - @placeholders = placeholders + def expand(str) + str.gsub(/(\${[a-z_]+(\[-?[0-9]+\])?}|__[A-Z_]+__)/) { + $log.warn "record_reformer: unknown placeholder `#{$1}` found" unless @placeholders.include?($1) + @placeholders[$1] + } + end end - def expand(str) - str.gsub(/(\${[a-z_]+(\[-?[0-9]+\])?}|__[A-Z_]+__)/) { - $log.warn "record_reformer: unknown placeholder `#{$1}` found" unless @placeholders.include?($1) - @placeholders[$1] - } - end - end + class RubyPlaceholderExpander + attr_reader :placeholders - class RubyPlaceholderExpander - attr_reader :placeholders + # Get placeholders as a struct + # + # @param [Time] time the time + # @param [Hash] record the record + # @param [Hash] opts others + def prepare_placeholders(time, record, opts) + struct = UndefOpenStruct.new(record) + struct.time = Time.at(time) + opts.each {|key, value| struct.__send__("#{key}=", value) } + @placeholders = struct + end - # Get placeholders as a struct - # - # @param [Time] time the time - # @param [Hash] record the record - # @param [Hash] opts others - def prepare_placeholders(time, record, opts) - struct = UndefOpenStruct.new(record) - struct.time = Time.at(time) - opts.each {|key, val| struct.__send__("#{key}=", val) } - @placeholders = struct - end + # Replace placeholders in a string + # + # @param [String] str the string to be replaced + def expand(str) + str = str.gsub(/\$\{([^}]+)\}/, '#{\1}') # ${..} => #{..} + eval "\"#{str}\"", @placeholders.instance_eval { binding } + end - # Replace placeholders in a string - # - # @param [String] str the string to be replaced - def expand(str) - str = str.gsub(/\$\{([^}]+)\}/, '#{\1}') # ${..} => #{..} - eval "\"#{str}\"", @placeholders.instance_eval { binding } - end - - class UndefOpenStruct < OpenStruct - (Object.instance_methods).each do |m| - undef_method m unless m.to_s =~ /^__|respond_to_missing\?|object_id|public_methods|instance_eval|method_missing|define_singleton_method|respond_to\?|new_ostruct_member/ + class UndefOpenStruct < OpenStruct + (Object.instance_methods).each do |m| + undef_method m unless m.to_s =~ /^__|respond_to_missing\?|object_id|public_methods|instance_eval|method_missing|define_singleton_method|respond_to\?|new_ostruct_member/ + end end end end end