lib/amqp/client/properties.rb in amqp-client-1.0.0 vs lib/amqp/client/properties.rb in amqp-client-1.0.1
- old
+ new
@@ -1,203 +1,236 @@
# frozen_string_literal: true
require_relative "./table"
module AMQP
- # Encode/decode AMQP Properties
- Properties = Struct.new(:content_type, :content_encoding, :headers, :delivery_mode, :priority, :correlation_id,
- :reply_to, :expiration, :message_id, :timestamp, :type, :user_id, :app_id,
- keyword_init: true) do
- def encode
- flags = 0
- arr = [flags]
- fmt = String.new("S>")
+ class Client
+ # Encode/decode AMQP Properties
+ # @!attribute content_type
+ # @return [String] Content type of the message body
+ # @!attribute content_encoding
+ # @return [String] Content encoding of the body
+ # @!attribute headers
+ # @return [Hash<String, Object>] Custom headers
+ # @!attribute delivery_mode
+ # @return [Integer] 2 for persisted message, transient messages for all other values
+ # @!attribute priority
+ # @return [Integer] A priority of the message (between 0 and 255)
+ # @!attribute correlation_id
+ # @return [Integer] A correlation id, most often used used for RPC communication
+ # @!attribute reply_to
+ # @return [String] Queue to reply RPC responses to
+ # @!attribute expiration
+ # @return [Integer, String] Number of seconds the message will stay in the queue
+ # @!attribute message_id
+ # @return [String]
+ # @!attribute timestamp
+ # @return [Date] User-definable, but often used for the time the message was originally generated
+ # @!attribute type
+ # @return [String] User-definable, but can can indicate what kind of message this is
+ # @!attribute user_id
+ # @return [String] User-definable, but can be used to verify that this is the user that published the message
+ # @!attribute app_id
+ # @return [String] User-definable, but often indicates which app that generated the message
+ Properties = Struct.new(:content_type, :content_encoding, :headers, :delivery_mode, :priority, :correlation_id,
+ :reply_to, :expiration, :message_id, :timestamp, :type, :user_id, :app_id,
+ keyword_init: true) do
+ # Encode properties into a byte array
+ # @return [String] byte array
+ def encode
+ flags = 0
+ arr = [flags]
+ fmt = StringIO.new(String.new("S>", capacity: 35))
+ fmt.pos = 2
- if content_type
- content_type.is_a?(String) || raise(ArgumentError, "content_type must be a string")
+ if content_type
+ content_type.is_a?(String) || raise(ArgumentError, "content_type must be a string")
- flags |= (1 << 15)
- arr << content_type.bytesize << content_type
- fmt << "Ca*"
- end
+ flags |= (1 << 15)
+ arr << content_type.bytesize << content_type
+ fmt << "Ca*"
+ end
- if content_encoding
- content_encoding.is_a?(String) || raise(ArgumentError, "content_encoding must be a string")
+ if content_encoding
+ content_encoding.is_a?(String) || raise(ArgumentError, "content_encoding must be a string")
- flags |= (1 << 14)
- arr << content_encoding.bytesize << content_encoding
- fmt << "Ca*"
- end
+ flags |= (1 << 14)
+ arr << content_encoding.bytesize << content_encoding
+ fmt << "Ca*"
+ end
- if headers
- headers.is_a?(Hash) || raise(ArgumentError, "headers must be a hash")
+ if headers
+ headers.is_a?(Hash) || raise(ArgumentError, "headers must be a hash")
- flags |= (1 << 13)
- tbl = Table.encode(headers)
- arr << tbl.bytesize << tbl
- fmt << "L>a*"
- end
+ flags |= (1 << 13)
+ tbl = Table.encode(headers)
+ arr << tbl.bytesize << tbl
+ fmt << "L>a*"
+ end
- if delivery_mode
- delivery_mode.is_a?(Integer) || raise(ArgumentError, "delivery_mode must be an int")
- delivery_mode.between?(0, 2) || raise(ArgumentError, "delivery_mode must be be between 0 and 2")
+ if delivery_mode
+ delivery_mode.is_a?(Integer) || raise(ArgumentError, "delivery_mode must be an int")
+ delivery_mode.between?(0, 2) || raise(ArgumentError, "delivery_mode must be be between 0 and 2")
- flags |= (1 << 12)
- arr << delivery_mode
- fmt << "C"
- end
+ flags |= (1 << 12)
+ arr << delivery_mode
+ fmt << "C"
+ end
- if priority
- priority.is_a?(Integer) || raise(ArgumentError, "priority must be an int")
- flags |= (1 << 11)
- arr << priority
- fmt << "C"
- end
+ if priority
+ priority.is_a?(Integer) || raise(ArgumentError, "priority must be an int")
+ flags |= (1 << 11)
+ arr << priority
+ fmt << "C"
+ end
- if correlation_id
- priority.is_a?(String) || raise(ArgumentError, "correlation_id must be a string")
+ if correlation_id
+ priority.is_a?(String) || raise(ArgumentError, "correlation_id must be a string")
- flags |= (1 << 10)
- arr << correlation_id.bytesize << correlation_id
- fmt << "Ca*"
- end
+ flags |= (1 << 10)
+ arr << correlation_id.bytesize << correlation_id
+ fmt << "Ca*"
+ end
- if reply_to
- reply_to.is_a?(String) || raise(ArgumentError, "reply_to must be a string")
+ if reply_to
+ reply_to.is_a?(String) || raise(ArgumentError, "reply_to must be a string")
- flags |= (1 << 9)
- arr << reply_to.bytesize << reply_to
- fmt << "Ca*"
- end
+ flags |= (1 << 9)
+ arr << reply_to.bytesize << reply_to
+ fmt << "Ca*"
+ end
- if expiration
- self.expiration = expiration.to_s if expiration.is_a?(Integer)
- expiration.is_a?(String) || raise(ArgumentError, "expiration must be a string or integer")
+ if expiration
+ self.expiration = expiration.to_s if expiration.is_a?(Integer)
+ expiration.is_a?(String) || raise(ArgumentError, "expiration must be a string or integer")
- flags |= (1 << 8)
- arr << expiration.bytesize << expiration
- fmt << "Ca*"
- end
+ flags |= (1 << 8)
+ arr << expiration.bytesize << expiration
+ fmt << "Ca*"
+ end
- if message_id
- message_id.is_a?(String) || raise(ArgumentError, "message_id must be a string")
+ if message_id
+ message_id.is_a?(String) || raise(ArgumentError, "message_id must be a string")
- flags |= (1 << 7)
- arr << message_id.bytesize << message_id
- fmt << "Ca*"
- end
+ flags |= (1 << 7)
+ arr << message_id.bytesize << message_id
+ fmt << "Ca*"
+ end
- if timestamp
- timestamp.is_a?(Integer) || timestamp.is_a?(Time) || raise(ArgumentError, "timestamp must be an Integer or a Time")
+ if timestamp
+ timestamp.is_a?(Integer) || timestamp.is_a?(Time) || raise(ArgumentError, "timestamp must be an Integer or a Time")
- flags |= (1 << 6)
- arr << timestamp.to_i
- fmt << "Q>"
- end
+ flags |= (1 << 6)
+ arr << timestamp.to_i
+ fmt << "Q>"
+ end
- if type
- type.is_a?(String) || raise(ArgumentError, "type must be a string")
+ if type
+ type.is_a?(String) || raise(ArgumentError, "type must be a string")
- flags |= (1 << 5)
- arr << type.bytesize << type
- fmt << "Ca*"
- end
+ flags |= (1 << 5)
+ arr << type.bytesize << type
+ fmt << "Ca*"
+ end
- if user_id
- user_id.is_a?(String) || raise(ArgumentError, "user_id must be a string")
+ if user_id
+ user_id.is_a?(String) || raise(ArgumentError, "user_id must be a string")
- flags |= (1 << 4)
- arr << user_id.bytesize << user_id
- fmt << "Ca*"
- end
+ flags |= (1 << 4)
+ arr << user_id.bytesize << user_id
+ fmt << "Ca*"
+ end
- if app_id
- app_id.is_a?(String) || raise(ArgumentError, "app_id must be a string")
+ if app_id
+ app_id.is_a?(String) || raise(ArgumentError, "app_id must be a string")
- flags |= (1 << 3)
- arr << app_id.bytesize << app_id
- fmt << "Ca*"
+ flags |= (1 << 3)
+ arr << app_id.bytesize << app_id
+ fmt << "Ca*"
+ end
+
+ arr[0] = flags
+ arr.pack(fmt.string)
end
- arr[0] = flags
- arr.pack(fmt)
- end
-
- def self.decode(bytes)
- h = new
- flags = bytes.unpack1("S>")
- pos = 2
- if (flags & 0x8000).positive?
- len = bytes[pos].ord
- pos += 1
- h[:content_type] = bytes.byteslice(pos, len).force_encoding("utf-8")
- pos += len
+ # Decode a byte array
+ # @return [Properties]
+ def self.decode(bytes)
+ h = new
+ flags = bytes.unpack1("S>")
+ pos = 2
+ if (flags & 0x8000).positive?
+ len = bytes[pos].ord
+ pos += 1
+ h[:content_type] = bytes.byteslice(pos, len).force_encoding("utf-8")
+ pos += len
+ end
+ if (flags & 0x4000).positive?
+ len = bytes[pos].ord
+ pos += 1
+ h[:content_encoding] = bytes.byteslice(pos, len).force_encoding("utf-8")
+ pos += len
+ end
+ if (flags & 0x2000).positive?
+ len = bytes.byteslice(pos, 4).unpack1("L>")
+ pos += 4
+ h[:headers] = Table.decode(bytes.byteslice(pos, len))
+ pos += len
+ end
+ if (flags & 0x1000).positive?
+ h[:delivery_mode] = bytes[pos].ord
+ pos += 1
+ end
+ if (flags & 0x0800).positive?
+ h[:priority] = bytes[pos].ord
+ pos += 1
+ end
+ if (flags & 0x0400).positive?
+ len = bytes[pos].ord
+ pos += 1
+ h[:correlation_id] = bytes.byteslice(pos, len).force_encoding("utf-8")
+ pos += len
+ end
+ if (flags & 0x0200).positive?
+ len = bytes[pos].ord
+ pos += 1
+ h[:reply_to] = bytes.byteslice(pos, len).force_encoding("utf-8")
+ pos += len
+ end
+ if (flags & 0x0100).positive?
+ len = bytes[pos].ord
+ pos += 1
+ h[:expiration] = bytes.byteslice(pos, len).force_encoding("utf-8")
+ pos += len
+ end
+ if (flags & 0x0080).positive?
+ len = bytes[pos].ord
+ pos += 1
+ h[:message_id] = bytes.byteslice(pos, len).force_encoding("utf-8")
+ pos += len
+ end
+ if (flags & 0x0040).positive?
+ h[:timestamp] = Time.at(bytes.byteslice(pos, 8).unpack1("Q>"))
+ pos += 8
+ end
+ if (flags & 0x0020).positive?
+ len = bytes[pos].ord
+ pos += 1
+ h[:type] = bytes.byteslice(pos, len).force_encoding("utf-8")
+ pos += len
+ end
+ if (flags & 0x0010).positive?
+ len = bytes[pos].ord
+ pos += 1
+ h[:user_id] = bytes.byteslice(pos, len).force_encoding("utf-8")
+ pos += len
+ end
+ if (flags & 0x0008).positive?
+ len = bytes[pos].ord
+ pos += 1
+ h[:app_id] = bytes.byteslice(pos, len).force_encoding("utf-8")
+ end
+ h
end
- if (flags & 0x4000).positive?
- len = bytes[pos].ord
- pos += 1
- h[:content_encoding] = bytes.byteslice(pos, len).force_encoding("utf-8")
- pos += len
- end
- if (flags & 0x2000).positive?
- len = bytes.byteslice(pos, 4).unpack1("L>")
- pos += 4
- h[:headers] = Table.decode(bytes.byteslice(pos, len))
- pos += len
- end
- if (flags & 0x1000).positive?
- h[:delivery_mode] = bytes[pos].ord
- pos += 1
- end
- if (flags & 0x0800).positive?
- h[:priority] = bytes[pos].ord
- pos += 1
- end
- if (flags & 0x0400).positive?
- len = bytes[pos].ord
- pos += 1
- h[:correlation_id] = bytes.byteslice(pos, len).force_encoding("utf-8")
- pos += len
- end
- if (flags & 0x0200).positive?
- len = bytes[pos].ord
- pos += 1
- h[:reply_to] = bytes.byteslice(pos, len).force_encoding("utf-8")
- pos += len
- end
- if (flags & 0x0100).positive?
- len = bytes[pos].ord
- pos += 1
- h[:expiration] = bytes.byteslice(pos, len).force_encoding("utf-8")
- pos += len
- end
- if (flags & 0x0080).positive?
- len = bytes[pos].ord
- pos += 1
- h[:message_id] = bytes.byteslice(pos, len).force_encoding("utf-8")
- pos += len
- end
- if (flags & 0x0040).positive?
- h[:timestamp] = Time.at(bytes.byteslice(pos, 8).unpack1("Q>"))
- pos += 8
- end
- if (flags & 0x0020).positive?
- len = bytes[pos].ord
- pos += 1
- h[:type] = bytes.byteslice(pos, len).force_encoding("utf-8")
- pos += len
- end
- if (flags & 0x0010).positive?
- len = bytes[pos].ord
- pos += 1
- h[:user_id] = bytes.byteslice(pos, len).force_encoding("utf-8")
- pos += len
- end
- if (flags & 0x0008).positive?
- len = bytes[pos].ord
- pos += 1
- h[:app_id] = bytes.byteslice(pos, len).force_encoding("utf-8")
- end
- h
end
end
end