lib/aws-eventstream/decoder.rb in aws-eventstream-1.0.3 vs lib/aws-eventstream/decoder.rb in aws-eventstream-1.1.0

- old
+ new

@@ -1,11 +1,11 @@ require 'stringio' require 'tempfile' require 'zlib' module Aws - module EventStream + module EventStream # This class provides method for decoding binary inputs into # single or multiple messages (Aws::EventStream::Message). # # * {#decode} - decodes messages from an IO like object responds @@ -26,11 +26,11 @@ # # # alternatively # message_pool = decoder.decode(io) # message_pool.next # # => Aws::EventStream::Message - # + # # * {#decode_chunk} - decodes a single message from a chunk of data, # returning message object followed by boolean(indicating eof status # of data) in an array object # # ## Examples @@ -61,46 +61,45 @@ class Decoder include Enumerable ONE_MEGABYTE = 1024 * 1024 + private_constant :ONE_MEGABYTE # bytes of prelude part, including 4 bytes of # total message length, headers length and crc checksum of prelude PRELUDE_LENGTH = 12 + private_constant :PRELUDE_LENGTH - # bytes of total overhead in a message, including prelude - # and 4 bytes total message crc checksum - OVERHEAD_LENGTH = 16 + # 4 bytes message crc checksum + CRC32_LENGTH = 4 + private_constant :CRC32_LENGTH - # @options options [Boolean] format (true) When `false` - # disable user-friendly formatting for message header values + # @param [Hash] options The initialization options. + # @option options [Boolean] :format (true) When `false` it + # disables user-friendly formatting for message header values # including timestamp and uuid etc. - # def initialize(options = {}) @format = options.fetch(:format, true) - @message_buffer = BytesBuffer.new('') + @message_buffer = '' end - # @returns [BytesBuffer] - attr_reader :message_buffer - # Decodes messages from a binary stream # # @param [IO#read] io An IO-like object # that responds to `#read` # # @yieldparam [Message] message # @return [Enumerable<Message>, nil] Returns a new Enumerable # containing decoded messages if no block is given def decode(io, &block) - io = BytesBuffer.new(io.read) - return decode_io(io) unless block_given? - until io.eof? - # fetch message only - yield(decode_message(io).first) - end + raw_message = io.read + decoded_message = decode_message(raw_message) + return wrap_as_enumerator(decoded_message) unless block_given? + # fetch message only + raw_event, _eof = decoded_message + block.call(raw_event) end # Decodes a single message from a chunk of string # # @param [String] chunk A chunk of string to be decoded, @@ -109,99 +108,85 @@ # # @return [Array<Message|nil, Boolean>] Returns single decoded message # and boolean pair, the boolean flag indicates whether this chunk # has been fully consumed, unused data is tracked at #message_buffer def decode_chunk(chunk = nil) - @message_buffer.write(chunk) if chunk - @message_buffer.rewind + @message_buffer = [@message_buffer, chunk].pack('a*a*') if chunk decode_message(@message_buffer) end private - def decode_io(io) - ::Enumerator.new {|e| e << decode_message(io) unless io.eof? } + # exposed via object.send for testing + attr_reader :message_buffer + + def wrap_as_enumerator(decoded_message) + Enumerator.new do |yielder| + yielder << decoded_message + end end - def decode_message(io) - # incomplete message prelude received, leave it in the buffer - return [nil, true] if io.bytesize < PRELUDE_LENGTH + def decode_message(raw_message) + # incomplete message prelude received + return [nil, true] if raw_message.bytesize < PRELUDE_LENGTH + prelude, content = raw_message.unpack("a#{PRELUDE_LENGTH}a*") + # decode prelude - total_len, headers_len, prelude_buffer = prelude(io) + total_length, header_length = decode_prelude(prelude) # incomplete message received, leave it in the buffer - return [nil, true] if io.bytesize < total_len + return [nil, true] if raw_message.bytesize < total_length + content, checksum, remaining = content.unpack("a#{total_length - PRELUDE_LENGTH - CRC32_LENGTH}Na*") + unless Zlib.crc32([prelude, content].pack('a*a*')) == checksum + raise Errors::MessageChecksumError + end + # decode headers and payload - headers, payload = context(io, total_len, headers_len, prelude_buffer) + headers, payload = decode_context(content, header_length) - # track extra message data in the buffer if exists - # for #decode_chunk, io is @message_buffer - if eof = io.eof? - @message_buffer.clear! - else - @message_buffer = BytesBuffer.new(@message_buffer.read) - end + @message_buffer = remaining - [Message.new(headers: headers, payload: payload), eof] + [Message.new(headers: headers, payload: payload), remaining.empty?] end - def prelude(io) - # buffer prelude into bytes buffer + def decode_prelude(prelude) # prelude contains length of message and headers, # followed with CRC checksum of itself - buffer = BytesBuffer.new(io.read(PRELUDE_LENGTH)) - - # prelude checksum takes last 4 bytes - checksum = Zlib.crc32(buffer.read(PRELUDE_LENGTH - 4)) - unless checksum == unpack_uint32(buffer) - raise Errors::PreludeChecksumError - end - - buffer.rewind - total_len, headers_len, _ = buffer.read.unpack('N*') - [total_len, headers_len, buffer] + content, checksum = prelude.unpack("a#{PRELUDE_LENGTH - CRC32_LENGTH}N") + raise Errors::PreludeChecksumError unless Zlib.crc32(content) == checksum + content.unpack('N*') end - def context(io, total_len, headers_len, prelude_buffer) - # buffer rest of the message except prelude length - # including context and total message checksum - buffer = BytesBuffer.new(io.read(total_len - PRELUDE_LENGTH)) - context_len = total_len - OVERHEAD_LENGTH - - prelude_buffer.rewind - checksum = Zlib.crc32(prelude_buffer.read << buffer.read(context_len)) - unless checksum == unpack_uint32(buffer) - raise Errors::MessageChecksumError - end - - buffer.rewind + def decode_context(content, header_length) + encoded_header, encoded_payload = content.unpack("a#{header_length}a*") [ - extract_headers(BytesBuffer.new(buffer.read(headers_len))), - extract_payload(BytesBuffer.new(buffer.read(context_len - headers_len))) + extract_headers(encoded_header), + extract_payload(encoded_payload) ] end def extract_headers(buffer) + scanner = buffer headers = {} - until buffer.eof? + until scanner.bytesize == 0 # header key - key_len = unpack_uint8(buffer) - key = buffer.read(key_len) + key_length, scanner = scanner.unpack('Ca*') + key, scanner = scanner.unpack("a#{key_length}a*") # header value - value_type = Types.types[unpack_uint8(buffer)] - unpack_pattern, value_len, _ = Types.pattern[value_type] - if !!unpack_pattern == unpack_pattern + type_index, scanner = scanner.unpack('Ca*') + value_type = Types.types[type_index] + unpack_pattern, value_length = Types.pattern[value_type] + value = if !!unpack_pattern == unpack_pattern # boolean types won't have value specified - value = unpack_pattern + unpack_pattern else - value_len = unpack_uint16(buffer) unless value_len - value = unpack_pattern ? - buffer.read(value_len).unpack(unpack_pattern)[0] : - buffer.read(value_len) + value_length, scanner = scanner.unpack('S>a*') unless value_length + unpacked_value, scanner = scanner.unpack("#{unpack_pattern || "a#{value_length}"}a*") + unpacked_value end headers[key] = HeaderValue.new( format: @format, value: value, @@ -209,42 +194,25 @@ ) end headers end - def extract_payload(buffer) - buffer.bytesize <= ONE_MEGABYTE ? - payload_stringio(buffer) : - payload_tempfile(buffer) + def extract_payload(encoded) + encoded.bytesize <= ONE_MEGABYTE ? + payload_stringio(encoded) : + payload_tempfile(encoded) end - def payload_stringio(buffer) - StringIO.new(buffer.read) + def payload_stringio(encoded) + StringIO.new(encoded) end - def payload_tempfile(buffer) + def payload_tempfile(encoded) payload = Tempfile.new payload.binmode - until buffer.eof? - payload.write(buffer.read(ONE_MEGABYTE)) - end + payload.write(encoded) payload.rewind payload end - - # overhead decode helpers - - def unpack_uint32(buffer) - buffer.read(4).unpack('N')[0] - end - - def unpack_uint16(buffer) - buffer.read(2).unpack('S>')[0] - end - - def unpack_uint8(buffer) - buffer.readbyte.unpack('C')[0] - end end - end end