lib/fluq/feed/msgpack.rb in fluq-0.7.0 vs lib/fluq/feed/msgpack.rb in fluq-0.7.1

- old
+ new

@@ -1,27 +1,27 @@ class FluQ::Feed::Msgpack < FluQ::Feed::Base - # @see [FluQ::Feed::Base] each - def each - buffer.drain do |io| - pac = MessagePack::Unpacker.new(io) - pac.each do |hash| - event = to_event(hash) - yield event if event - end + # @see FluQ::Feed::Base.to_event + def self.to_event(raw) + raw = MessagePack.unpack(raw) if raw.is_a?(String) + + case raw + when Hash + FluQ::Event.new raw.delete("="), raw.delete("@"), raw + else + logger.warn "buffer contained invalid event #{raw.inspect}" + nil end - rescue EOFError end - private + protected - def to_event(hash) - case hash - when Hash - FluQ::Event.new hash.delete("="), hash.delete("@"), hash - else - logger.warn "buffer contained invalid event #{hash.inspect}" - nil + # @see [FluQ::Feed::Base] each + def each_raw(&block) + buffer.drain do |io| + pac = MessagePack::Unpacker.new(io) + pac.each(&block) end + rescue EOFError end end \ No newline at end of file