class Fluent::UnitTimeFilterOutput < Fluent::Output class Buffer include Enumerable def initialize(options) @init_queue = [] @fiber = Fiber.new do |tag, es| @init_queue << [tag, es] run(options) end end def each loop do tag, es = @init_queue.shift || Fiber.yield break unless (tag && es) es.each do |time, record| yield([tag, time, record]) end end end def resume(tag, es) @fiber.resume(tag, es) end private def run(options) filter = options[:filter] unit_sec = options[:unit_sec] prefix = options[:prefix] emit_each_tag = options[:emit_each_tag] pass_hash_row = options[:pass_hash_row] prev_time = nil self.chunk {|tag, time, record| time }.slice_before {|time, records| result = (time % unit_sec).zero? || (prev_time && minus_r(time, unit_sec) != minus_r(prev_time, unit_sec)) prev_time = time result }.each {|records| records = records.inject([]) {|r, i| r + i[1] } records = conv_to_hash_rows_if_needed(records, options) time = get_time(records.first, options) time -= time % unit_sec rs = filter.call(records) rs = [rs] unless rs.kind_of?(Array) if emit_each_tag tags = records.map {|r| get_tag(r, options) }.uniq tags.each {|t| emit_records(prefix, t, time, rs) } else tag = get_tag(records.first, options) emit_records(prefix, tag, time, rs) end } end def emit_records(prefix, tag, time, records) records.each do |record| Fluent::Engine.emit("#{prefix}.#{tag}", time, record) end end def conv_to_hash_rows_if_needed(records, options) return records unless options[:pass_hash_row] time_key = options[:hash_row_time_key] tag_key = options[:hash_row_tag_key] records.map do |tag, time, r| r[time_key] = time r[tag_key] = tag r end end def get_time(record, options) time_key = options[:hash_row_time_key] options[:pass_hash_row] ? record[time_key] : record[1] end def get_tag(record, options) tag_key = options[:hash_row_tag_key] options[:pass_hash_row] ? record[tag_key] : record[0] end def minus_r(m, n) m - (m % n) end end end