lib/aws-eventstream/decoder.rb in aws-eventstream-1.0.3 vs lib/aws-eventstream/decoder.rb in aws-eventstream-1.1.0
- old
+ new
@@ -1,11 +1,11 @@
require 'stringio'
require 'tempfile'
require 'zlib'
module Aws
- module EventStream
+ module EventStream
# This class provides method for decoding binary inputs into
# single or multiple messages (Aws::EventStream::Message).
#
# * {#decode} - decodes messages from an IO like object responds
@@ -26,11 +26,11 @@
#
# # alternatively
# message_pool = decoder.decode(io)
# message_pool.next
# # => Aws::EventStream::Message
- #
+ #
# * {#decode_chunk} - decodes a single message from a chunk of data,
# returning message object followed by boolean(indicating eof status
# of data) in an array object
#
# ## Examples
@@ -61,46 +61,45 @@
class Decoder
include Enumerable
ONE_MEGABYTE = 1024 * 1024
+ private_constant :ONE_MEGABYTE
# bytes of prelude part, including 4 bytes of
# total message length, headers length and crc checksum of prelude
PRELUDE_LENGTH = 12
+ private_constant :PRELUDE_LENGTH
- # bytes of total overhead in a message, including prelude
- # and 4 bytes total message crc checksum
- OVERHEAD_LENGTH = 16
+ # 4 bytes message crc checksum
+ CRC32_LENGTH = 4
+ private_constant :CRC32_LENGTH
- # @options options [Boolean] format (true) When `false`
- # disable user-friendly formatting for message header values
+ # @param [Hash] options The initialization options.
+ # @option options [Boolean] :format (true) When `false` it
+ # disables user-friendly formatting for message header values
# including timestamp and uuid etc.
- #
def initialize(options = {})
@format = options.fetch(:format, true)
- @message_buffer = BytesBuffer.new('')
+ @message_buffer = ''
end
- # @returns [BytesBuffer]
- attr_reader :message_buffer
-
# Decodes messages from a binary stream
#
# @param [IO#read] io An IO-like object
# that responds to `#read`
#
# @yieldparam [Message] message
# @return [Enumerable<Message>, nil] Returns a new Enumerable
# containing decoded messages if no block is given
def decode(io, &block)
- io = BytesBuffer.new(io.read)
- return decode_io(io) unless block_given?
- until io.eof?
- # fetch message only
- yield(decode_message(io).first)
- end
+ raw_message = io.read
+ decoded_message = decode_message(raw_message)
+ return wrap_as_enumerator(decoded_message) unless block_given?
+ # fetch message only
+ raw_event, _eof = decoded_message
+ block.call(raw_event)
end
# Decodes a single message from a chunk of string
#
# @param [String] chunk A chunk of string to be decoded,
@@ -109,99 +108,85 @@
#
# @return [Array<Message|nil, Boolean>] Returns single decoded message
# and boolean pair, the boolean flag indicates whether this chunk
# has been fully consumed, unused data is tracked at #message_buffer
def decode_chunk(chunk = nil)
- @message_buffer.write(chunk) if chunk
- @message_buffer.rewind
+ @message_buffer = [@message_buffer, chunk].pack('a*a*') if chunk
decode_message(@message_buffer)
end
private
- def decode_io(io)
- ::Enumerator.new {|e| e << decode_message(io) unless io.eof? }
+ # exposed via object.send for testing
+ attr_reader :message_buffer
+
+ def wrap_as_enumerator(decoded_message)
+ Enumerator.new do |yielder|
+ yielder << decoded_message
+ end
end
- def decode_message(io)
- # incomplete message prelude received, leave it in the buffer
- return [nil, true] if io.bytesize < PRELUDE_LENGTH
+ def decode_message(raw_message)
+ # incomplete message prelude received
+ return [nil, true] if raw_message.bytesize < PRELUDE_LENGTH
+ prelude, content = raw_message.unpack("a#{PRELUDE_LENGTH}a*")
+
# decode prelude
- total_len, headers_len, prelude_buffer = prelude(io)
+ total_length, header_length = decode_prelude(prelude)
# incomplete message received, leave it in the buffer
- return [nil, true] if io.bytesize < total_len
+ return [nil, true] if raw_message.bytesize < total_length
+ content, checksum, remaining = content.unpack("a#{total_length - PRELUDE_LENGTH - CRC32_LENGTH}Na*")
+ unless Zlib.crc32([prelude, content].pack('a*a*')) == checksum
+ raise Errors::MessageChecksumError
+ end
+
# decode headers and payload
- headers, payload = context(io, total_len, headers_len, prelude_buffer)
+ headers, payload = decode_context(content, header_length)
- # track extra message data in the buffer if exists
- # for #decode_chunk, io is @message_buffer
- if eof = io.eof?
- @message_buffer.clear!
- else
- @message_buffer = BytesBuffer.new(@message_buffer.read)
- end
+ @message_buffer = remaining
- [Message.new(headers: headers, payload: payload), eof]
+ [Message.new(headers: headers, payload: payload), remaining.empty?]
end
- def prelude(io)
- # buffer prelude into bytes buffer
+ def decode_prelude(prelude)
# prelude contains length of message and headers,
# followed with CRC checksum of itself
- buffer = BytesBuffer.new(io.read(PRELUDE_LENGTH))
-
- # prelude checksum takes last 4 bytes
- checksum = Zlib.crc32(buffer.read(PRELUDE_LENGTH - 4))
- unless checksum == unpack_uint32(buffer)
- raise Errors::PreludeChecksumError
- end
-
- buffer.rewind
- total_len, headers_len, _ = buffer.read.unpack('N*')
- [total_len, headers_len, buffer]
+ content, checksum = prelude.unpack("a#{PRELUDE_LENGTH - CRC32_LENGTH}N")
+ raise Errors::PreludeChecksumError unless Zlib.crc32(content) == checksum
+ content.unpack('N*')
end
- def context(io, total_len, headers_len, prelude_buffer)
- # buffer rest of the message except prelude length
- # including context and total message checksum
- buffer = BytesBuffer.new(io.read(total_len - PRELUDE_LENGTH))
- context_len = total_len - OVERHEAD_LENGTH
-
- prelude_buffer.rewind
- checksum = Zlib.crc32(prelude_buffer.read << buffer.read(context_len))
- unless checksum == unpack_uint32(buffer)
- raise Errors::MessageChecksumError
- end
-
- buffer.rewind
+ def decode_context(content, header_length)
+ encoded_header, encoded_payload = content.unpack("a#{header_length}a*")
[
- extract_headers(BytesBuffer.new(buffer.read(headers_len))),
- extract_payload(BytesBuffer.new(buffer.read(context_len - headers_len)))
+ extract_headers(encoded_header),
+ extract_payload(encoded_payload)
]
end
def extract_headers(buffer)
+ scanner = buffer
headers = {}
- until buffer.eof?
+ until scanner.bytesize == 0
# header key
- key_len = unpack_uint8(buffer)
- key = buffer.read(key_len)
+ key_length, scanner = scanner.unpack('Ca*')
+ key, scanner = scanner.unpack("a#{key_length}a*")
# header value
- value_type = Types.types[unpack_uint8(buffer)]
- unpack_pattern, value_len, _ = Types.pattern[value_type]
- if !!unpack_pattern == unpack_pattern
+ type_index, scanner = scanner.unpack('Ca*')
+ value_type = Types.types[type_index]
+ unpack_pattern, value_length = Types.pattern[value_type]
+ value = if !!unpack_pattern == unpack_pattern
# boolean types won't have value specified
- value = unpack_pattern
+ unpack_pattern
else
- value_len = unpack_uint16(buffer) unless value_len
- value = unpack_pattern ?
- buffer.read(value_len).unpack(unpack_pattern)[0] :
- buffer.read(value_len)
+ value_length, scanner = scanner.unpack('S>a*') unless value_length
+ unpacked_value, scanner = scanner.unpack("#{unpack_pattern || "a#{value_length}"}a*")
+ unpacked_value
end
headers[key] = HeaderValue.new(
format: @format,
value: value,
@@ -209,42 +194,25 @@
)
end
headers
end
- def extract_payload(buffer)
- buffer.bytesize <= ONE_MEGABYTE ?
- payload_stringio(buffer) :
- payload_tempfile(buffer)
+ def extract_payload(encoded)
+ encoded.bytesize <= ONE_MEGABYTE ?
+ payload_stringio(encoded) :
+ payload_tempfile(encoded)
end
- def payload_stringio(buffer)
- StringIO.new(buffer.read)
+ def payload_stringio(encoded)
+ StringIO.new(encoded)
end
- def payload_tempfile(buffer)
+ def payload_tempfile(encoded)
payload = Tempfile.new
payload.binmode
- until buffer.eof?
- payload.write(buffer.read(ONE_MEGABYTE))
- end
+ payload.write(encoded)
payload.rewind
payload
end
-
- # overhead decode helpers
-
- def unpack_uint32(buffer)
- buffer.read(4).unpack('N')[0]
- end
-
- def unpack_uint16(buffer)
- buffer.read(2).unpack('S>')[0]
- end
-
- def unpack_uint8(buffer)
- buffer.readbyte.unpack('C')[0]
- end
end
-
end
end