Sha256: 0eeb8896af24b8d2b642eee8424b0764c45f113ef85559e72c63ae6c8b6d5b4f

Contents?: true

Size: 560 Bytes

Versions: 3

Compression:

Stored size: 560 Bytes

Contents

class FluQ::Feed::Msgpack < FluQ::Feed::Base

  # @see FluQ::Feed::Base.to_event
  def self.to_event(raw)
    raw = MessagePack.unpack(raw) if raw.is_a?(String)

    case raw
    when Hash
      FluQ::Event.new raw.delete("="), raw.delete("@"), raw
    else
      logger.warn "buffer contained invalid event #{raw.inspect}"
      nil
    end
  end

  protected

    # @see [FluQ::Feed::Base] each
    def each_raw(&block)
      buffer.drain do |io|
        pac = MessagePack::Unpacker.new(io)
        pac.each(&block)
      end
    rescue EOFError
    end

end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
fluq-0.7.5 lib/fluq/feed/msgpack.rb
fluq-0.7.3 lib/fluq/feed/msgpack.rb
fluq-0.7.1 lib/fluq/feed/msgpack.rb