#-- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. #++ module Qpid::Proton # A Message represents an addressable quantity of data. # # ==== Message Body # # The message body can be set using the #body= method. The message will # then attempt to determine how exactly to encode the content. # # ==== Examples # # To create a message for sending: # # # send a simple text message # msg = Qpid::Proton::Message.new # msg.body = "STATE: update" # # # send a binary chunk of data # data = File.binread("/home/qpid/binfile.tar.gz") # msg = Qpid::Proton::Message.new # msg.body = Qpid::Proton::BinaryString.new(data) # class Message # @private def proton_send(sender, tag = nil) dlv = sender.delivery(tag || sender.delivery_tag) encoded = self.encode sender.stream(encoded) sender.advance dlv.settle if sender.snd_settle_mode == Link::SND_SETTLED return dlv end # Decodes a message from supplied AMQP data and returns the number # of bytes consumed. # # ==== Options # # * encoded - the encoded data # def decode(encoded) check(Cproton.pn_message_decode(@impl, encoded, encoded.length)) post_decode end def post_decode # :nodoc: # decode elements from the message @properties = {} props = Codec::Data.new(Cproton::pn_message_properties(@impl)) if props.next @properties = props.type.get(props) end @instructions = nil insts = Codec::Data.new(Cproton::pn_message_instructions(@impl)) if insts.next @instructions = insts.type.get(insts) end @annotations = nil annts = Codec::Data.new(Cproton::pn_message_annotations(@impl)) if annts.next @annotations = annts.type.get(annts) end @body = nil body = Codec::Data.new(Cproton::pn_message_body(@impl)) if body.next @body = body.type.get(body) end end # Encodes the message. def encode pre_encode size = 16 loop do error, data = Cproton::pn_message_encode(@impl, size) if error == Qpid::Proton::Error::OVERFLOW size *= 2 else check(error) return data end end end def pre_encode # :nodoc: # encode elements from the message props = Codec::Data.new(Cproton::pn_message_properties(@impl)) props.clear Codec::Mapping.for_class(@properties.class).put(props, @properties) unless @properties.empty? insts = Codec::Data.new(Cproton::pn_message_instructions(@impl)) insts.clear if !@instructions.nil? mapping = Codec::Mapping.for_class(@instructions.class) mapping.put(insts, @instructions) end annts = Codec::Data.new(Cproton::pn_message_annotations(@impl)) annts.clear if !@annotations.nil? mapping = Codec::Mapping.for_class(@annotations.class) mapping.put(annts, @annotations, :keys => :SYMBOL) end body = Codec::Data.new(Cproton::pn_message_body(@impl)) body.clear if !@body.nil? mapping = Codec::Mapping.for_class(@body.class) mapping.put(body, @body) end end # Creates a new +Message+ instance. def initialize @impl = Cproton.pn_message ObjectSpace.define_finalizer(self, self.class.finalize!(@impl)) @properties = {} @instructions = {} @annotations = {} @body = nil end def to_s tmp = Cproton.pn_string("") Cproton.pn_inspect(@impl, tmp) result = Cproton.pn_string_get(tmp) Cproton.pn_free(tmp) return result end # Invoked by garbage collection to clean up resources used # by the underlying message implementation. def self.finalize!(impl) # :nodoc: proc { Cproton.pn_message_free(impl) } end # Returns the underlying message implementation. def impl # :nodoc: @impl end # Clears the state of the +Message+. This allows a single instance of # +Message+ to be reused. # def clear Cproton.pn_message_clear(@impl) @properties.clear unless @properties.nil? @instructions.clear unless @instructions.nil? @annotations.clear unless @annotations.nil? @body = nil end # Returns the most recent error number. # def errno Cproton.pn_message_errno(@impl) end # Returns the most recent error message. # def error Cproton.pn_error_text(Cproton.pn_message_error(@impl)) end # Returns whether there is currently an error reported. # def error? !Cproton.pn_message_errno(@impl).zero? end # Sets the durable flag. # # See ::durable for more details on message durability. # # ==== Options # # * state - the durable state # def durable=(state) raise TypeError.new("state cannot be nil") if state.nil? Cproton.pn_message_set_durable(@impl, state) end # Returns the durable property. # # The durable property indicates that the emessage should be held durably # by any intermediaries taking responsibility for the message. # # ==== Examples # # msg = Qpid::Proton::Message.new # msg.durable = true # def durable Cproton.pn_message_is_durable(@impl) end # Sets the priority. # # +NOTE:+ Priority values are limited to the range [0,255]. # # ==== Options # # * priority - the priority value # def priority=(priority) raise TypeError.new("invalid priority: #{priority}") if priority.nil? || !([Float, Fixnum].include?(priority.class)) raise RangeError.new("priority out of range: #{priority}") if ((priority > 255) || (priority < 0)) Cproton.pn_message_set_priority(@impl, priority.floor) end # Returns the priority. # def priority Cproton.pn_message_get_priority(@impl) end # Sets the time-to-live for the message. # # ==== Options # # * time - the time in milliseconds # def ttl=(time) raise TypeError.new("invalid ttl: #{time}") if time.nil? || !([Float, Fixnum].include?(time.class)) raise RangeError.new("time out of range: #{time}") if ((time < 0)) Cproton.pn_message_set_ttl(@impl, time.floor) end # Returns the time-to-live, in milliseconds. # def ttl Cproton.pn_message_get_ttl(@impl) end # Sets whether this is the first time the message was acquired. # # See ::first_acquirer? for more details. # # ==== Options # # * state - true if claiming the message # def first_acquirer=(state) raise TypeError.new("invalid state: #{state}") if state.nil? || !([TrueClass, FalseClass].include?(state.class)) Cproton.pn_message_set_first_acquirer(@impl, state) end # Sets the delivery count for the message. # # See ::delivery_count for more details. # # ==== Options # # * count - the delivery count # def delivery_count=(count) raise ::ArgumentError.new("invalid count: #{count}") if count.nil? || !([Float, Fixnum].include?(count.class)) raise RangeError.new("count out of range: #{count}") if count < 0 Cproton.pn_message_set_delivery_count(@impl, count.floor) end # Returns the delivery count for the message. # # This is the number of delivery attempts for the given message. # def delivery_count Cproton.pn_message_get_delivery_count(@impl) end # Returns whether this is the first acquirer. # # def first_acquirer? Cproton.pn_message_is_first_acquirer(@impl) end # Sets the message id. # # ==== Options # # * id = the id # def id=(id) Cproton.pn_message_set_id(@impl, id) end # Returns the message id. # def id Cproton.pn_message_get_id(@impl) end # Sets the user id. # # ==== Options # # * id - the user id # def user_id=(id) Cproton.pn_message_set_user_id(@impl, id) end # Returns the user id. # def user_id Cproton.pn_message_get_user_id(@impl) end # Sets the destination address. # # ==== Options # # * address - the address # def address=(address) Cproton.pn_message_set_address(@impl, address) end # Returns the destination address. # def address Cproton.pn_message_get_address(@impl) end # Sets the subject. # # ==== Options # # * subject - the subject # def subject=(subject) Cproton.pn_message_set_subject(@impl, subject) end # Returns the subject # def subject Cproton.pn_message_get_subject(@impl) end # Sets the reply-to address. # # ==== Options # # * address - the reply-to address # def reply_to=(address) Cproton.pn_message_set_reply_to(@impl, address) end # Returns the reply-to address # def reply_to Cproton.pn_message_get_reply_to(@impl) end # Sets the correlation id. # # ==== Options # # * id - the correlation id # def correlation_id=(id) Cproton.pn_message_set_correlation_id(@impl, id) end # Returns the correlation id. # def correlation_id Cproton.pn_message_get_correlation_id(@impl) end # Sets the content type. # # ==== Options # # * content_type - the content type # def content_type=(content_type) Cproton.pn_message_set_content_type(@impl, content_type) end # Returns the content type # def content_type Cproton.pn_message_get_content_type(@impl) end # Sets the message content. # # *WARNING:* This method has been deprecated. Please use #body= instead to # set the content of a message. # # ==== Options # # * content - the content # def content=(content) Cproton.pn_message_load(@impl, content) end # Returns the message content. # # *WARNING:* This method has been deprecated. Please use #body instead to # retrieve the content of a message. # def content size = 16 loop do result = Cproton.pn_message_save(@impl, size) error = result[0] data = result[1] if error == Qpid::Proton::Error::OVERFLOW size = size * 2 else check(error) return data end end end # Sets the content encoding type. # # ==== Options # # * encoding - the content encoding # def content_encoding=(encoding) Cproton.pn_message_set_content_encoding(@impl, encoding) end # Returns the content encoding type. # def content_encoding Cproton.pn_message_get_content_encoding(@impl) end # Sets the expiration time. # # ==== Options # # * time - the expiry time # def expires=(time) raise TypeError.new("invalid expiry time: #{time}") if time.nil? raise ::ArgumentError.new("expiry time cannot be negative: #{time}") if time < 0 Cproton.pn_message_set_expiry_time(@impl, time) end # Returns the expiration time. # def expires Cproton.pn_message_get_expiry_time(@impl) end # Sets the creation time. # # ==== Options # # * time - the creation time # def creation_time=(time) raise TypeError.new("invalid time: #{time}") if time.nil? raise ::ArgumentError.new("time cannot be negative") if time < 0 Cproton.pn_message_set_creation_time(@impl, time) end # Returns the creation time. # def creation_time Cproton.pn_message_get_creation_time(@impl) end # Sets the group id. # # ==== Options # # * id - the group id # def group_id=(id) Cproton.pn_message_set_group_id(@impl, id) end # Returns the group id. # def group_id Cproton.pn_message_get_group_id(@impl) end # Sets the group sequence number. # # ==== Options # # * seq - the sequence number # def group_sequence=(seq) raise TypeError.new("invalid seq: #{seq}") if seq.nil? Cproton.pn_message_set_group_sequence(@impl, seq) end # Returns the group sequence number. # def group_sequence Cproton.pn_message_get_group_sequence(@impl) end # Sets the reply-to group id. # # ==== Options # # * id - the id # def reply_to_group_id=(id) Cproton.pn_message_set_reply_to_group_id(@impl, id) end # Returns the reply-to group id. # def reply_to_group_id Cproton.pn_message_get_reply_to_group_id(@impl) end # Returns the list of property names for associated with this message. # # ==== Examples # # msg.properties.each do |name| # end # def properties @properties end # Replaces the entire set of properties with the specified hash. # def properties=(properties) @properties = properties end # Assigns the value given to the named property. # # ==== Arguments # # * name - the property name # * value - the property value # def []=(name, value) @properties[name] = value end # Retrieves the value for the specified property name. If not found, then # it returns nil. # def [](name) @properties[name] end # Deletes the named property. # def delete_property(name) @properties.delete(name) end # Returns the instructions for this message. # def instructions @instructions end # Assigns instructions to this message. # def instructions=(instr) @instructions = instr end # Returns the annotations for this message. # def annotations @annotations end # Assigns annotations to this message. # def annotations=(annotations) @annotations = annotations end # Returns the body property of the message. # def body @body end # Assigns a new value to the body of the message. # def body=(body) @body = body end private def check(err) # :nodoc: if err < 0 raise DataError, "[#{err}]: #{Cproton.pn_message_error(@data)}" else return err end end end end