lib/fluq/feed/tsv.rb in fluq-0.7.0 vs lib/fluq/feed/tsv.rb in fluq-0.7.1

- old
+ new

@@ -1,30 +1,30 @@ class FluQ::Feed::Tsv < FluQ::Feed::Base - # @see [FluQ::Feed::Base] each - def each - buffer.drain do |io| - while line = io.gets - event = to_event(line) - yield event if event - end + # @see FluQ::Feed::Base.to_event + def self.to_event(raw) + tag, timestamp, json = raw.split("\t") + + case hash = Oj.load(json) + when Hash + FluQ::Event.new tag, timestamp, hash + else + logger.warn "buffer contained invalid event #{hash.inspect}" + nil end + rescue Oj::ParseError, ArgumentError + logger.warn "buffer contained invalid line #{raw.inspect}" + nil end - private + protected - def to_event(line) - tag, timestamp, json = line.split("\t") - - case hash = Oj.load(json) - when Hash - FluQ::Event.new tag, timestamp, hash - else - logger.warn "buffer contained invalid event #{[tag, timestamp, hash].inspect}" - nil + # @see [FluQ::Feed::Base] each_raw + def each_raw + buffer.drain do |io| + while line = io.gets + yield line + end end - rescue Oj::ParseError, ArgumentError - logger.warn "buffer contained invalid line #{line.inspect}" - nil end end \ No newline at end of file