lib/fluq/feed/msgpack.rb in fluq-0.7.0 vs lib/fluq/feed/msgpack.rb in fluq-0.7.1
- old
+ new
@@ -1,27 +1,27 @@
class FluQ::Feed::Msgpack < FluQ::Feed::Base
- # @see [FluQ::Feed::Base] each
- def each
- buffer.drain do |io|
- pac = MessagePack::Unpacker.new(io)
- pac.each do |hash|
- event = to_event(hash)
- yield event if event
- end
+ # @see FluQ::Feed::Base.to_event
+ def self.to_event(raw)
+ raw = MessagePack.unpack(raw) if raw.is_a?(String)
+
+ case raw
+ when Hash
+ FluQ::Event.new raw.delete("="), raw.delete("@"), raw
+ else
+ logger.warn "buffer contained invalid event #{raw.inspect}"
+ nil
end
- rescue EOFError
end
- private
+ protected
- def to_event(hash)
- case hash
- when Hash
- FluQ::Event.new hash.delete("="), hash.delete("@"), hash
- else
- logger.warn "buffer contained invalid event #{hash.inspect}"
- nil
+ # @see [FluQ::Feed::Base] each
+ def each_raw(&block)
+ buffer.drain do |io|
+ pac = MessagePack::Unpacker.new(io)
+ pac.each(&block)
end
+ rescue EOFError
end
end
\ No newline at end of file