lib/fluent/plugin/map_support.rb in fluent-plugin-map-0.1.3 vs lib/fluent/plugin/map_support.rb in fluent-plugin-map-0.2.0

- old
+ new

@@ -13,101 +13,103 @@ # See the License for the specific language governing permissions and # limitations under the License. # module Fluent - class MapSupport - def initialize(map, plugin) - @map = map - @plugin = plugin - if defined?(Fluent::Filter) and plugin.is_a?(Fluent::Filter) - singleton_class.module_eval(<<-CODE) + module Plugin + class MapSupport + def initialize(map, plugin) + @map = map + @plugin = plugin + if plugin.is_a?(Fluent::Plugin::Filter) + singleton_class.module_eval(<<-CODE) def map_func(time, record) #{@map} end CODE - class << self - alias_method :generate_tuples, :generate_tuples_filter - alias_method :do_map, :do_map_filter - end - elsif plugin.is_a?(Fluent::Output) - singleton_class.module_eval(<<-CODE) + class << self + alias_method :generate_tuples, :generate_tuples_filter + alias_method :do_map, :do_map_filter + end + elsif plugin.is_a?(Fluent::Plugin::Output) + singleton_class.module_eval(<<-CODE) def map_func(tag, time, record) #{@map} end CODE - class << self - alias_method :generate_tuples, :generate_tuples_output - alias_method :do_map, :do_map_output + class << self + alias_method :generate_tuples, :generate_tuples_output + alias_method :do_map, :do_map_output + end end end - end - def do_map(tag, es) - # This method will be overwritten in #initailize. - end + def do_map(tag, es) + # This method will be overwritten in #initailize. + end - def do_map_filter(tag, es) - tuples = generate_tuples(tag, es) + def do_map_filter(tag, es) + tuples = generate_tuples(tag, es) - tag_output_es = Hash.new{|h, key| h[key] = Fluent::MultiEventStream.new} - tuples.each do |time, record| - if time == nil || record == nil - raise SyntaxError.new + tag_output_es = Hash.new{|h, key| h[key] = Fluent::MultiEventStream.new} + tuples.each do |time, record| + if time == nil || record == nil + raise SyntaxError.new + end + tag_output_es[tag].add(time, record) + @plugin.log.trace { [tag, time, record].inspect } end - tag_output_es[tag].add(time, record) - @plugin.log.trace { [tag, time, record].inspect } + tag_output_es end - tag_output_es - end - def do_map_output(tag, es) - tuples = generate_tuples(tag, es) + def do_map_output(tag, es) + tuples = generate_tuples(tag, es) - tag_output_es = Hash.new{|h, key| h[key] = Fluent::MultiEventStream.new} - tuples.each do |tag, time, record| - if time == nil || record == nil - raise SyntaxError.new + tag_output_es = Hash.new{|h, key| h[key] = Fluent::MultiEventStream.new} + tuples.each do |tag, time, record| + if time == nil || record == nil + raise SyntaxError.new + end + tag_output_es[tag].add(time, record) + @plugin.log.trace { [tag, time, record].inspect } end - tag_output_es[tag].add(time, record) - @plugin.log.trace { [tag, time, record].inspect } + tag_output_es end - tag_output_es - end - def generate_tuples - # This method will be overwritten in #initailize. - end + def generate_tuples + # This method will be overwritten in #initailize. + end - def generate_tuples_filter(tag, es) - tuples = [] - es.each {|time, record| - timeout_block do - new_tuple = map_func(time, record) - tuples.concat new_tuple - end - } - tuples - end + def generate_tuples_filter(tag, es) + tuples = [] + es.each {|time, record| + timeout_block do + new_tuple = map_func(time, record) + tuples.concat new_tuple + end + } + tuples + end - def generate_tuples_output(tag, es) - tuples = [] - es.each {|time, record| - timeout_block do - new_tuple = map_func(tag, time, record) - tuples.concat new_tuple - end - } - tuples - end - - def timeout_block - begin - Timeout.timeout(@plugin.timeout){ - yield + def generate_tuples_output(tag, es) + tuples = [] + es.each {|time, record| + timeout_block do + new_tuple = map_func(tag, time, record) + tuples.concat new_tuple + end } - rescue Timeout::Error - @plugin.log.error {"Timeout: #{Time.at(time)} #{tag} #{record.inspect}"} + tuples + end + + def timeout_block + begin + Timeout.timeout(@plugin.timeout){ + yield + } + rescue Timeout::Error + @plugin.log.error {"Timeout: #{Time.at(time)} #{tag} #{record.inspect}"} + end end end end end