Sha256: ea13406a0ab172363bfdcf10bc489466ad936466229be97c9c92a033f2f5ef98

Contents?: true

Size: 624 Bytes

Versions: 3

Compression:

Stored size: 624 Bytes

Contents

class FluQ::Feed::Tsv < FluQ::Feed::Base

  # @see FluQ::Feed::Base.to_event
  def self.to_event(raw)
    tag, timestamp, json = raw.split("\t")

    case hash = Oj.load(json)
    when Hash
      FluQ::Event.new tag, timestamp, hash
    else
      logger.warn "buffer contained invalid event #{hash.inspect}"
      nil
    end
  rescue Oj::ParseError, ArgumentError
    logger.warn "buffer contained invalid line #{raw.inspect}"
    nil
  end

  protected

    # @see [FluQ::Feed::Base] each_raw
    def each_raw
      buffer.drain do |io|
        while line = io.gets
          yield line
        end
      end
    end

end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
fluq-0.7.5 lib/fluq/feed/tsv.rb
fluq-0.7.3 lib/fluq/feed/tsv.rb
fluq-0.7.1 lib/fluq/feed/tsv.rb