lib/fluent/plugin/map_support.rb in fluent-plugin-map-0.1.3 vs lib/fluent/plugin/map_support.rb in fluent-plugin-map-0.2.0
- old
+ new
@@ -13,101 +13,103 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Fluent
- class MapSupport
- def initialize(map, plugin)
- @map = map
- @plugin = plugin
- if defined?(Fluent::Filter) and plugin.is_a?(Fluent::Filter)
- singleton_class.module_eval(<<-CODE)
+ module Plugin
+ class MapSupport
+ def initialize(map, plugin)
+ @map = map
+ @plugin = plugin
+ if plugin.is_a?(Fluent::Plugin::Filter)
+ singleton_class.module_eval(<<-CODE)
def map_func(time, record)
#{@map}
end
CODE
- class << self
- alias_method :generate_tuples, :generate_tuples_filter
- alias_method :do_map, :do_map_filter
- end
- elsif plugin.is_a?(Fluent::Output)
- singleton_class.module_eval(<<-CODE)
+ class << self
+ alias_method :generate_tuples, :generate_tuples_filter
+ alias_method :do_map, :do_map_filter
+ end
+ elsif plugin.is_a?(Fluent::Plugin::Output)
+ singleton_class.module_eval(<<-CODE)
def map_func(tag, time, record)
#{@map}
end
CODE
- class << self
- alias_method :generate_tuples, :generate_tuples_output
- alias_method :do_map, :do_map_output
+ class << self
+ alias_method :generate_tuples, :generate_tuples_output
+ alias_method :do_map, :do_map_output
+ end
end
end
- end
- def do_map(tag, es)
- # This method will be overwritten in #initailize.
- end
+ def do_map(tag, es)
+ # This method will be overwritten in #initailize.
+ end
- def do_map_filter(tag, es)
- tuples = generate_tuples(tag, es)
+ def do_map_filter(tag, es)
+ tuples = generate_tuples(tag, es)
- tag_output_es = Hash.new{|h, key| h[key] = Fluent::MultiEventStream.new}
- tuples.each do |time, record|
- if time == nil || record == nil
- raise SyntaxError.new
+ tag_output_es = Hash.new{|h, key| h[key] = Fluent::MultiEventStream.new}
+ tuples.each do |time, record|
+ if time == nil || record == nil
+ raise SyntaxError.new
+ end
+ tag_output_es[tag].add(time, record)
+ @plugin.log.trace { [tag, time, record].inspect }
end
- tag_output_es[tag].add(time, record)
- @plugin.log.trace { [tag, time, record].inspect }
+ tag_output_es
end
- tag_output_es
- end
- def do_map_output(tag, es)
- tuples = generate_tuples(tag, es)
+ def do_map_output(tag, es)
+ tuples = generate_tuples(tag, es)
- tag_output_es = Hash.new{|h, key| h[key] = Fluent::MultiEventStream.new}
- tuples.each do |tag, time, record|
- if time == nil || record == nil
- raise SyntaxError.new
+ tag_output_es = Hash.new{|h, key| h[key] = Fluent::MultiEventStream.new}
+ tuples.each do |tag, time, record|
+ if time == nil || record == nil
+ raise SyntaxError.new
+ end
+ tag_output_es[tag].add(time, record)
+ @plugin.log.trace { [tag, time, record].inspect }
end
- tag_output_es[tag].add(time, record)
- @plugin.log.trace { [tag, time, record].inspect }
+ tag_output_es
end
- tag_output_es
- end
- def generate_tuples
- # This method will be overwritten in #initailize.
- end
+ def generate_tuples
+ # This method will be overwritten in #initailize.
+ end
- def generate_tuples_filter(tag, es)
- tuples = []
- es.each {|time, record|
- timeout_block do
- new_tuple = map_func(time, record)
- tuples.concat new_tuple
- end
- }
- tuples
- end
+ def generate_tuples_filter(tag, es)
+ tuples = []
+ es.each {|time, record|
+ timeout_block do
+ new_tuple = map_func(time, record)
+ tuples.concat new_tuple
+ end
+ }
+ tuples
+ end
- def generate_tuples_output(tag, es)
- tuples = []
- es.each {|time, record|
- timeout_block do
- new_tuple = map_func(tag, time, record)
- tuples.concat new_tuple
- end
- }
- tuples
- end
-
- def timeout_block
- begin
- Timeout.timeout(@plugin.timeout){
- yield
+ def generate_tuples_output(tag, es)
+ tuples = []
+ es.each {|time, record|
+ timeout_block do
+ new_tuple = map_func(tag, time, record)
+ tuples.concat new_tuple
+ end
}
- rescue Timeout::Error
- @plugin.log.error {"Timeout: #{Time.at(time)} #{tag} #{record.inspect}"}
+ tuples
+ end
+
+ def timeout_block
+ begin
+ Timeout.timeout(@plugin.timeout){
+ yield
+ }
+ rescue Timeout::Error
+ @plugin.log.error {"Timeout: #{Time.at(time)} #{tag} #{record.inspect}"}
+ end
end
end
end
end