# Copyright 2011 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. require 'aws/inflection' require 'aws/model' require 'aws/sqs/received_message' require 'aws/sqs/received_sns_message' module AWS class SQS # Represents an Amazon SQS Queue. # # @example Sending a message # msg = queue.send_message("HELLO") # puts "Sent message: #{msg.id}" # # @example Polling for messages indefinitely # queue.poll do |msg| # puts "Got message: #{msg.body}" # end # class Queue # The default number of seconds to wait between polling requests for # new messages. DEFAULT_POLL_INTERVAL = 1 include Model # @return [String] The queue URL. attr_reader :url # @private def initialize(url, opts = {}) @url = url super end # Deletes the queue, regardless of whether it is empty. # # When you delete a queue, the deletion process takes up to 60 # seconds. Requests you send involving that queue during the # 60 seconds might succeed. For example, calling # {#send_message} might succeed, but after the 60 seconds, the # queue and that message you sent no longer exist. # # Also, when you delete a queue, you must wait at least 60 seconds # before creating a queue with the same name. # @return [nil] def delete client.delete_queue(:queue_url => url) nil end # Represents a message sent using {Queue#send_message}. class SentMessage # @return [String] Returns the message ID. attr_accessor :message_id alias_method :id, :message_id # @return [String] Returns an MD5 digest of the message body # string. You can use this to verify that SQS received your # message correctly. attr_accessor :md5 end # Delivers a message to this queue. # # @param [String] body The message to send. The maximum # allowed message size is 64 KB. The message may only # contain Unicode characters from the following list, # according to the W3C XML specification (for more # information, go to # http://www.w3.org/TR/REC-xml/#charsets). If you send any # characters not included in the list, your request will be # rejected. # # * #x9 # * #xA # * #xD # * #x20 to #xD7FF # * #xE000 to #xFFFD # * #x10000 to #x10FFFF # # @return [SentMessage] An object containing information about # the message that was sent. def send_message(body) resp = client.send_message(:queue_url => url, :message_body => body) msg = SentMessage.new msg.message_id = resp.message_id msg.md5 = resp.md5_of_message_body msg end # Retrieves one or more messages. When a block is given, each # message is yielded to the block and then deleted as long as # the block exits normally. When no block is given, you must # delete the message yourself using {ReceivedMessage#delete}. # # @note Due to the distributed nature of the queue, a weighted # random set of machines is sampled on a ReceiveMessage # call. That means only the messages on the sampled machines # are returned. If the number of messages in the queue is # small (less than 1000), it is likely you will get fewer # messages than you requested per call to # {#receive_message}. If the number of messages in the queue # is extremely small, you might not receive any messages. # To poll continually for messages, use the {#poll} method, # which automatically retries the request after a # configurable delay. # # @param [Hash] opts Options for receiving messages. # # @option opts [Integer] :limit The maximum number of messages # to receive. By default this is 1, and the return value is # a single message object. If this options is specified and # is not 1, the return value is an array of message objects; # however, the array may contain fewer objects than you # requested. Valid values: integers from 1 to 10. # # Not necessarily all the messages in the queue are returned # (for more information, see the preceding note about # machine sampling). # # @option opts [Integer] :visibilitiy_timeout The duration (in # seconds) that the received messages are hidden from # subsequent retrieve requests. Valid values: integer from # 0 to 43200 (maximum 12 hours) # # @option opts [Array<Symbol, String>] :attributes The # attributes to populate in each received message. Valid values: # # * +:all+ (to populate all attributes) # * +:sender_id+ # * +:sent_at+ # * +:receive_count+ # * +:first_received_at+ # # See {ReceivedMessage} for documentation on each # attribute's meaning. # # @yieldparam [ReceivedMessage] message Each message that was received. # # @return [ReceivedMessage] Returns the received message (or messages) # only if a block is not given to this method. # def receive_message(opts = {}, &block) resp = client.receive_message(receive_opts(opts)) messages = resp.messages.map do |m| ReceivedMessage.new(self, m.message_id, m.receipt_handle, :body => m.body, :md5 => m.md5_of_body, :attributes => m.attributes) end if block call_message_block(messages, block) elsif opts[:limit] && opts[:limit] != 1 messages else messages.first end end alias_method :receive_messages, :receive_message # Polls continually for messages. For example, you can use # this to poll indefinitely: # # queue.poll { |msg| puts msg.body } # # Or, to poll indefinitely for the first message and then # continue polling until no message is received for a period # of at least ten seconds: # # queue.poll(:initial_timeout => false, # :idle_timeout => 10) { |msg| puts msg.body } # # As with the block form of {#receive_message}, this method # automatically deletes the message then the block exits # normally. # # @yieldparam [ReceivedMessage] message Each message that was received. # # @param [Hash] opts Options for polling. # # @option opts [Float, Integer] :poll_interval The number of # seconds to wait before retrying when no messages are # received. The default is 1 second. # # @option opts [Integer] :idle_timeout The maximum number of # seconds to spend polling while no messages are being # returned. By default this method polls indefinitely # whether messages are received or not. # # @option opts [Integer] :initial_timeout The maximum number # of seconds to spend polling before the first message is # received. This option defaults to the value of # +:idle_timeout+. You can specify +false+ to poll # indefinitely for the first message when +:idle_timeout+ is # set. # # @option opts [Integer] :batch_size The maximum number of # messages to retrieve in a single request. By default # messages are received one at a time. Valid values: # integers from 1 to 10. # # @option opts [Integer] :visibilitiy_timeout The duration (in # seconds) that the received messages are hidden from # subsequent retrieve requests. Valid values: integer from # 0 to 43200 (maximum 12 hours) # # @option opts [Array<Symbol, String>] :attributes The # attributes to populate in each received message. Valid values: # # * +:all+ (to populate all attributes) # * +:sender_id+ # * +:sent_at+ # * +:receive_count+ # * +:first_received_at+ # # See {ReceivedMessage} for documentation on each # attribute's meaning. # # @return [nil] def poll(opts = {}, &block) poll_interval = opts[:poll_interval] || DEFAULT_POLL_INTERVAL opts[:limit] = opts.delete(:batch_size) if opts.key?(:batch_size) last_message_at = Time.now got_first = false loop do got_msg = false receive_messages(opts) do |message| got_msg = got_first = true last_message_at = Time.now yield(message) end unless got_msg Kernel.sleep(poll_interval) unless poll_interval == 0 return if hit_timeout?(got_first, last_message_at, opts) end end nil end # @return [Integer] The approximate number of visible messages # in a queue. For more information, see {Resources Required # to Process # Messages}[http://docs.amazonwebservices.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/IntroductionArticle.html#ApproximateNumber] # in the Amazon SQS Developer Guide. def approximate_number_of_messages get_attribute("ApproximateNumberOfMessages").to_i end alias_method :visible_messages, :approximate_number_of_messages # @return [Integer] The approximate number of messages that # are not timed-out and not deleted. For more information, # see {Resources Required to Process # Messages}[http://docs.amazonwebservices.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/IntroductionArticle.html#ApproximateNumber] # in the Amazon SQS Developer Guide. def approximate_number_of_messages_not_visible get_attribute("ApproximateNumberOfMessagesNotVisible").to_i end alias_method :invisible_messages, :approximate_number_of_messages_not_visible # @return [Integer] Returns the visibility timeout for the # queue. For more information about visibility timeout, see # {Visibility # Timeout}[http://docs.amazonwebservices.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/IntroductionArticle.html#AboutVT] # in the Amazon SQS Developer Guide. def visibility_timeout get_attribute("VisibilityTimeout").to_i end # Sets the visibility timeout for the queue. # # @param [Integer] timeout The length of time (in seconds) # that a message received from a queue will be invisible to # other receiving components when they ask to receive # messages. Valid values: integers from 0 to 43200 (12 # hours). # # @return Returns the value passed as a timeout. def visibility_timeout=(timeout) set_attribute("VisibilityTimeout", timeout.to_s) timeout end # @return [Time] The time when the queue was created. def created_timestamp Time.at(get_attribute("CreatedTimestamp").to_i) end # @return [Time] The time when the queue was last changed. def last_modified_timestamp Time.at(get_attribute("LastModifiedTimestamp").to_i) end # @return [Integer] The limit of how many bytes a message can # contain before Amazon SQS rejects it. def maximum_message_size get_attribute("MaximumMessageSize").to_i end # Sets the maximum message size for the queue. # # @param [Integer] size The limit of how many bytes a message # can contain before Amazon SQS rejects it. This must be an # integer from 1024 bytes (1KB) up to 65536 bytes # (64KB). The default for this attribute is 8192 (8KB). # @return Retuns the passed size argument. def maximum_message_size=(size) set_attribute("MaximumMessageSize", size.to_s) end # @return [Integer] The number of seconds Amazon SQS retains a # message. def message_retention_period get_attribute("MessageRetentionPeriod").to_i end # Sets the message retention period for the queue # # @param [Integer] period The number of seconds Amazon SQS # retains a message. Must be an integer from 3600 (1 hour) # to 1209600 (14 days). The default for this attribute is # 345600 (4 days). # @return Returns the passed period argument. def message_retention_period=(period) set_attribute("MessageRetentionPeriod", period.to_s) period end # @return [String] The queue's Amazon resource name (ARN). def arn @arn ||= get_attribute("QueueArn") end # @return [Boolean] True if the queue exists. # # @note This may raise an exception if you don't have # permission to access the queue attributes. Also, it may # return true for up to 60 seconds after a queue has been # deleted. def exists? client.get_queue_attributes(:queue_url => url, :attribute_names => ["QueueArn"]) rescue Errors::NonExistentQueue, Errors::InvalidAddress false else true end # @private module PolicyProxy attr_accessor :queue def change yield(self) queue.policy = self end def delete queue.client.send(:set_attribute, 'Policy', '') end end # @return [Policy] Returns the current queue policy if there is one. # Returns +nil+ otherwise. def policy if policy_json = get_attribute('Policy') policy = SQS::Policy.from_json(policy_json) policy.extend(PolicyProxy) policy.queue = self policy else nil end end # Set the policy on this queue. # # If you pass nil or an empty string then it will have the same # effect as deleting the policy. # # @param policy The policy to set. This policy can be a {Policy} object, # a json policy string, or any other object that responds with a policy # string when it received #to_json. # # @return [nil] def policy= policy policy_string = case policy when nil, '' then '' when String then policy else policy.to_json end set_attribute('Policy', policy_string) nil end # @return [Boolean] Returns true if the other queue has the same # url. def ==(other) other.kind_of?(Queue) and other.url == url end alias_method :eql?, :== # @private def inspect "<#{self.class}:#{url}>" end # @private protected def hit_timeout?(got_first, last_message_at, opts) initial_timeout = opts[:initial_timeout] idle_timeout = opts[:idle_timeout] timeout = (got_first || # if initial_timeout is false (as opposed # to nil) then we skip the branch and poll # indefinitely until the first message # comes (!initial_timeout && initial_timeout != false) ? idle_timeout : initial_timeout) and Time.now - last_message_at > timeout end # @private protected def receive_opts(opts) receive_opts = { :queue_url => url } receive_opts[:visibility_timeout] = opts[:visibility_timeout] if opts[:visibility_timeout] receive_opts[:max_number_of_messages] = opts[:limit] if opts[:limit] if names = opts[:attributes] receive_opts[:attribute_names] = names.map do |name| name = ReceivedMessage::ATTRIBUTE_ALIASES[name.to_sym] if ReceivedMessage::ATTRIBUTE_ALIASES.key?(name.to_sym) name = Inflection.class_name(name.to_s) if name.kind_of?(Symbol) name end end receive_opts end # @private protected def call_message_block(messages, block) result = nil messages.each do |message| begin result = block.call(message) rescue Exception => e raise else message.delete end end result end # @private protected def get_attribute(name) resp = client.get_queue_attributes(:queue_url => url, :attribute_names => [name, "QueueArn"].uniq) @arn ||= resp.attributes["QueueArn"] resp.attributes[name] end # @private protected def set_attribute(name, value) client.set_queue_attributes(:queue_url => url, :attribute => { :name => name, :value => value }) end end end end