# Copyright 2011-2012 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
#     http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

module AWS
  class SQS

    # Represents an Amazon SQS Queue.
    #
    # @example Sending a message
    #   msg = queue.send_message("HELLO")
    #   puts "Sent message: #{msg.id}"
    #
    # @example Polling for messages indefinitely
    #   queue.poll do |msg|
    #     puts "Got message: #{msg.body}"
    #   end
    #
    class Queue

      # The default number of seconds to wait between polling requests for
      # new messages.  
      DEFAULT_POLL_INTERVAL = 1

      include Core::Model

      # @return [String] The queue URL.
      attr_reader :url

      # @private
      def initialize(url, opts = {})
        @url = url
        super
      end

      # Deletes the queue, regardless of whether it is empty.  
      #
      # When you delete a queue, the deletion process takes up to 60
      # seconds. Requests you send involving that queue during the
      # 60 seconds might succeed. For example, calling
      # {#send_message} might succeed, but after the 60 seconds, the
      # queue and that message you sent no longer exist. 
      #
      # Also, when you delete a queue, you must wait at least 60 seconds 
      # before creating a queue with the same name.
      # @return [nil]
      def delete
        client.delete_queue(:queue_url => url)
        nil
      end

      # Represents a message sent using {Queue#send_message}.
      class SentMessage

        # @return [String] Returns the message ID.
        attr_accessor :message_id

        alias_method :id, :message_id

        # @return [String] Returns an MD5 digest of the message body 
        #   string.  You can use this to verify that SQS received your
        #   message correctly.
        attr_accessor :md5

      end

      # Delivers a message to this queue.
      #
      # @param [String] body The message to send.  The maximum
      #   allowed message size is 64 KB.  The message may only
      #   contain Unicode characters from the following list,
      #   according to the W3C XML specification (for more
      #   information, go to
      #   http://www.w3.org/TR/REC-xml/#charsets).  If you send any
      #   characters not included in the list, your request will be
      #   rejected.
      #
      #   * #x9
      #   * #xA
      #   * #xD
      #   * #x20 to #xD7FF
      #   * #xE000 to #xFFFD
      #   * #x10000 to #x10FFFF
      #
      # @param [Hash] options
      #
      # @option options [Integer] :delay_seconds The number of seconds to 
      #   delay the message. The message will become available for
      #   processing after the delay time has passed.
      #   If you don't specify a value, the default value for the 
      #   queue applies.  Should be from 0 to 900 (15 mins).
      #
      # @return [SentMessage] An object containing information about
      #   the message that was sent.
      #
      def send_message body, options = {}

        client_opts = options.dup
        client_opts[:queue_url] = url
        client_opts[:message_body] = body

        response = client.send_message(client_opts)

        msg = SentMessage.new
        msg.message_id = response.message_id
        msg.md5 = response.md5_of_message_body
        msg

      end

      # Retrieves one or more messages.  When a block is given, each
      # message is yielded to the block and then deleted as long as
      # the block exits normally.  When no block is given, you must
      # delete the message yourself using {ReceivedMessage#delete}.
      #
      # @note Due to the distributed nature of the queue, a weighted
      #   random set of machines is sampled on a ReceiveMessage
      #   call. That means only the messages on the sampled machines
      #   are returned. If the number of messages in the queue is
      #   small (less than 1000), it is likely you will get fewer
      #   messages than you requested per call to
      #   {#receive_message}. If the number of messages in the queue
      #   is extremely small, you might not receive any messages.
      #   To poll continually for messages, use the {#poll} method,
      #   which automatically retries the request after a
      #   configurable delay.
      #
      # @param [Hash] opts Options for receiving messages.
      #
      # @option opts [Integer] :limit The maximum number of messages
      #   to receive.  By default this is 1, and the return value is
      #   a single message object.  If this options is specified and
      #   is not 1, the return value is an array of message objects;
      #   however, the array may contain fewer objects than you
      #   requested.  Valid values: integers from 1 to 10.
      #
      #   Not necessarily all the messages in the queue are returned
      #   (for more information, see the preceding note about
      #   machine sampling).
      #
      # @option opts [Integer] :visibilitiy_timeout The duration (in
      #   seconds) that the received messages are hidden from
      #   subsequent retrieve requests.  Valid values: integer from
      #   0 to 43200 (maximum 12 hours)
      #
      # @option opts [Array<Symbol, String>] :attributes The
      #   attributes to populate in each received message.  Valid values:
      #
      #   * +:all+ (to populate all attributes)
      #   * +:sender_id+
      #   * +:sent_at+
      #   * +:receive_count+
      #   * +:first_received_at+
      #
      #   See {ReceivedMessage} for documentation on each
      #   attribute's meaning.
      #
      # @yieldparam [ReceivedMessage] message Each message that was received.
      #
      # @return [ReceivedMessage] Returns the received message (or messages)
      #   only if a block is not given to this method.
      #
      def receive_message(opts = {}, &block)
        resp = client.receive_message(receive_opts(opts))

        messages = resp.messages.map do |m|
          ReceivedMessage.new(self, m.message_id, m.receipt_handle,
                              :body => m.body,
                              :md5 => m.md5_of_body,
                              :attributes => m.attributes)
        end

        if block
          call_message_block(messages, block)
        elsif opts[:limit] && opts[:limit] != 1
          messages
        else
          messages.first
        end
      end
      alias_method :receive_messages, :receive_message

      # Polls continually for messages.  For example, you can use
      # this to poll indefinitely:
      #
      #  queue.poll { |msg| puts msg.body }
      #
      # Or, to poll indefinitely for the first message and then
      # continue polling until no message is received for a period
      # of at least ten seconds:
      #
      #  queue.poll(:initial_timeout => false,
      #             :idle_timeout => 10) { |msg| puts msg.body }
      #
      # As with the block form of {#receive_message}, this method
      # automatically deletes the message then the block exits
      # normally.
      #
      # @yieldparam [ReceivedMessage] message Each message that was received.
      #
      # @param [Hash] opts Options for polling.
      #
      # @option opts [Float, Integer] :poll_interval The number of
      #   seconds to wait before retrying when no messages are
      #   received.  The default is 1 second.
      #
      # @option opts [Integer] :idle_timeout The maximum number of
      #   seconds to spend polling while no messages are being
      #   returned.  By default this method polls indefinitely
      #   whether messages are received or not.
      #
      # @option opts [Integer] :initial_timeout The maximum number
      #   of seconds to spend polling before the first message is
      #   received.  This option defaults to the value of
      #   +:idle_timeout+.  You can specify +false+ to poll
      #   indefinitely for the first message when +:idle_timeout+ is
      #   set.
      #
      # @option opts [Integer] :batch_size The maximum number of
      #   messages to retrieve in a single request.  By default
      #   messages are received one at a time.  Valid values:
      #   integers from 1 to 10.
      #
      # @option opts [Integer] :visibilitiy_timeout The duration (in
      #   seconds) that the received messages are hidden from
      #   subsequent retrieve requests.  Valid values: integer from
      #   0 to 43200 (maximum 12 hours)
      #
      # @option opts [Array<Symbol, String>] :attributes The
      #   attributes to populate in each received message.  Valid values:
      #
      #   * +:all+ (to populate all attributes)
      #   * +:sender_id+
      #   * +:sent_at+
      #   * +:receive_count+
      #   * +:first_received_at+
      #
      #   See {ReceivedMessage} for documentation on each
      #   attribute's meaning.
      #
      # @return [nil]
      def poll(opts = {}, &block)
        poll_interval = opts[:poll_interval] || DEFAULT_POLL_INTERVAL
        opts[:limit] = opts.delete(:batch_size) if
          opts.key?(:batch_size)

        last_message_at = Time.now
        got_first = false
        loop do
          got_msg = false
          receive_messages(opts) do |message|
            got_msg = got_first = true
            last_message_at = Time.now
            yield(message)
          end
          unless got_msg
            Kernel.sleep(poll_interval) unless poll_interval == 0
            return if hit_timeout?(got_first, last_message_at, opts)
          end
        end
        nil
      end

      # @return [Integer] The approximate number of visible messages
      #   in a queue.  For more information, see {Resources Required
      #   to Process
      #   Messages}[http://docs.amazonwebservices.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/IntroductionArticle.html#ApproximateNumber]
      #   in the Amazon SQS Developer Guide.
      def approximate_number_of_messages
        get_attribute("ApproximateNumberOfMessages").to_i
      end
      alias_method :visible_messages, :approximate_number_of_messages

      # @return [Integer] The approximate number of messages that
      #   are not timed-out and not deleted.  For more information,
      #   see {Resources Required to Process
      #   Messages}[http://docs.amazonwebservices.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/IntroductionArticle.html#ApproximateNumber]
      #   in the Amazon SQS Developer Guide.
      def approximate_number_of_messages_not_visible
        get_attribute("ApproximateNumberOfMessagesNotVisible").to_i
      end
      alias_method :invisible_messages, :approximate_number_of_messages_not_visible

      # @return [Integer] Returns the visibility timeout for the
      #   queue. For more information about visibility timeout, see
      #   {Visibility
      #   Timeout}[http://docs.amazonwebservices.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/IntroductionArticle.html#AboutVT]
      #   in the Amazon SQS Developer Guide.
      def visibility_timeout
        get_attribute("VisibilityTimeout").to_i
      end

      # Sets the visibility timeout for the queue.
      #
      # @param [Integer] timeout The length of time (in seconds)
      #   that a message received from a queue will be invisible to
      #   other receiving components when they ask to receive
      #   messages.  Valid values: integers from 0 to 43200 (12
      #   hours).
      #
      # @return Returns the value passed as a timeout.
      def visibility_timeout=(timeout)
        set_attribute("VisibilityTimeout", timeout.to_s)
        timeout
      end

      # @return [Time] The time when the queue was created.
      def created_timestamp
        Time.at(get_attribute("CreatedTimestamp").to_i)
      end

      # @return [Time] The time when the queue was last changed.
      def last_modified_timestamp
        Time.at(get_attribute("LastModifiedTimestamp").to_i)
      end

      # @return [Integer] The limit of how many bytes a message can
      #   contain before Amazon SQS rejects it.
      def maximum_message_size
        get_attribute("MaximumMessageSize").to_i
      end

      # Sets the maximum message size for the queue.
      #
      # @param [Integer] size The limit of how many bytes a message
      #   can contain before Amazon SQS rejects it.  This must be an
      #   integer from 1024 bytes (1KB) up to 65536 bytes
      #   (64KB). The default for this attribute is 8192 (8KB).
      # @return Retuns the passed size argument.
      def maximum_message_size=(size)
        set_attribute("MaximumMessageSize", size.to_s)
      end

      # @return [Integer] The number of seconds Amazon SQS retains a
      #   message.
      def message_retention_period
        get_attribute("MessageRetentionPeriod").to_i
      end

      # Sets the message retention period for the queue
      #
      # @param [Integer] period The number of seconds Amazon SQS
      #   retains a message.  Must be an integer from 3600 (1 hour)
      #   to 1209600 (14 days). The default for this attribute is
      #   345600 (4 days).
      # @return Returns the passed period argument.
      def message_retention_period=(period)
        set_attribute("MessageRetentionPeriod", period.to_s)
        period
      end

      # @return [Integer] Gets the current default delay for messages sent 
      #   to the queue.
      def delay_seconds
        get_attribute("DelaySeconds").to_i
      end

      # Sets the default delay for messages sent to the queue.
      # @param [Integer] seconds How many seconds a message will be delayed.
      def delay_seconds= seconds
        set_attribute("DelaySeconds", seconds.to_s)
      end

      # @return [Integer] Returns an approximate count of messages delayed.
      def approximate_number_of_messages_delayed
        get_attribute("ApproximateNumberOfMessagesDelayed").to_i
      end

      # @return [String] The queue's Amazon resource name (ARN).
      def arn
        @arn ||= get_attribute("QueueArn")
      end

      # @return [Boolean] True if the queue exists.
      #
      # @note This may raise an exception if you don't have
      #   permission to access the queue attributes.  Also, it may
      #   return true for up to 60 seconds after a queue has been
      #   deleted.
      def exists?
        client.get_queue_attributes(:queue_url => url,
                                    :attribute_names => ["QueueArn"])
      rescue Errors::NonExistentQueue, Errors::InvalidAddress
        false
      else
        true
      end

      # @private
      module PolicyProxy

        attr_accessor :queue

        def change
          yield(self)
          queue.policy = self
        end

        def delete
          queue.client.send(:set_attribute, 'Policy', '')
        end

      end

      # @return [Policy] Returns the current queue policy if there is one.
      #   Returns +nil+ otherwise.
      def policy
        if policy_json = get_attribute('Policy')
          policy = SQS::Policy.from_json(policy_json)
          policy.extend(PolicyProxy)
          policy.queue = self
          policy
        else
          nil
        end
      end
      
      # Set the policy on this queue.
      #
      # If you pass nil or an empty string then it will have the same 
      # effect as deleting the policy.
      #
      # @param policy The policy to set.  This policy can be a {Policy} object,
      #   a json policy string, or any other object that responds with a policy
      #   string when it received #to_json.
      #
      # @return [nil]
      #
      def policy= policy
        policy_string = case policy
        when nil, '' then ''
        when String  then policy
        else policy.to_json
        end
        set_attribute('Policy', policy_string)
        nil
      end

      # Sends a batch of up to 10 messages in a single request.
      # 
      #   queue.send_messages('message-1', 'message-2')
      #
      # You can also set an optional delay for all of the messages:
      #
      #   # delay all messages 1 hour
      #   queue.batch_send(msg1, msg2, :delay_seconds => 3600)
      #
      # If you need to set a custom delay for each message you can pass
      # hashes:
      #
      #   messages = []
      #   messages << { :message_body => 'msg1', :delay_seconds => 60 }
      #   messages << { :message_body => 'msg2', :delay_seconds => 30 }
      #
      #   queue.batch_send(messages)
      #
      # @param [String,Hash] messages A list of messages.  Each message
      #   should be a string, or a hash with a +:message_body+,
      #   and optionally +:delay_seconds+.
      #
      # @raise [Errors::BatchSendError] Raises this error when one or more
      #   of the messages failed to send, but others did.  On the raised
      #   object you can access a list of the messages that failed, and
      #   a list of messages that succeeded.
      #
      # @return [Array<SentMessage>] Returns an array of sent message objects.
      #   Each object responds to #message_id and #md5_of_message_body.
      #   The message id is generated by Amazon SQS.
      #
      def batch_send *messages

        entries = messages.flatten

        unless entries.first.is_a?(Hash)
          options = entries.last.is_a?(Hash) ? entries.pop : {}
          entries = entries.collect{|msg| { :message_body => msg } }
          if delay = options[:delay_seconds]
            entries.each {|entry| entry[:delay_seconds] = delay }
          end
        end

        entries.each_with_index {|entry,n| entry[:id] = n.to_s }

        client_opts = {}
        client_opts[:queue_url] = url
        client_opts[:entries] = entries

        response = client.send_message_batch(client_opts)

        failed = batch_failures(entries, response)

        sent = response.successful.collect do |sent|
          msg = SentMessage.new
          msg.message_id = sent.message_id
          msg.md5 = sent.md5_of_message_body
          msg
        end

        raise Errors::BatchSendError.new(sent, failed) unless failed.empty?

        sent
        
      end

      # @param [ReceivedMessage,String] message A list of up to 10 messages 
      #   to delete.  Each message should be a {ReceivedMessage} object 
      #   or a received message handle (string).
      #
      # @raise [Errors::BatchDeleteSend] Raised when one or more of the
      #   messages failed to delete.  The raised error has a list
      #   of the failures.
      #
      # @return [nil]
      #   
      def batch_delete *messages

        entries = []
        messages.flatten.each_with_index do |msg,n|
          handle = msg.is_a?(ReceivedMessage) ? msg.handle : msg
          entries << { :id => n.to_s, :receipt_handle => handle }
        end

        response = client.delete_message_batch(
          :queue_url => url, :entries => entries)

        failures = batch_failures(entries, response)

        raise Errors::BatchDeleteError.new(failures) unless failures.empty?

        nil

      end

      # @overload batch_change_message_visibility(visibility_timeout, *messages)
      #
      #   Accepts a single +:visibility_timeout+ value and a list of 
      #   messages ({ReceivedMessage} objects or receipt handle strings).  
      #   This form of the method is useful when you want to set the same
      #   timeout value for each message.
      #
      #     queue.bacth_change_message_visibility(10, messages)
      #
      #   @param [Integer] visibility_timeout The new value for the message's 
      #     visibility timeout (in seconds).
      #
      #   @param [ReceivedMessage,String] message A list of up to 10 messages
      #     to change the visibility timeout for.
      #
      #   @raise [BatchChangeVisibilityError] Raises this error when one
      #     or more of the messages failed the visibility update.
      #
      #   @return [nil]
      #
      # @overload batch_change_message_visibility(*messages_with_timeouts)
      #
      #   Accepts a list of hashes.  Each hash should provide the visibility
      #   timeout and message (a {ReceivedMessage} object or the recipt handle
      #   string).  
      #
      #   Use this form when each message needs a different visiblity timeout.
      #
      #     messages = []
      #     messages << { :message => 'handle1', :visibility_timeout => 5 }
      #     messages << { :message => 'handle2', :visibility_timeout => 10 }
      #
      #     queue.bacth_change_message_visibility(*messages)
      #
      #   @param [Hash] message A list hashes, each with a +:visibility_timeout+
      #     and a +:message+.  
      #
      #   @raise [BatchChangeVisibilityError] Raises this error when one
      #     or more of the messages failed the visibility update.
      #
      #   @return [nil]
      #
      def batch_change_visibility *args
        
        args = args.flatten
        
        if args.first.is_a?(Integer)
          timeout = args.shift
          messages = args.collect{|m| [m, timeout] }
        else
          messages = args.collect{|m| [m[:message], m[:visibility_timeout]] }
        end

        entries = []
        messages.each do |msg,timeout|
          handle = msg.is_a?(ReceivedMessage) ? msg.handle : msg
          entries << {
            :id => entries.size.to_s, 
            :receipt_handle => handle,
            :visibility_timeout => timeout,
          }
        end

        response = client.change_message_visibility_batch(
          :queue_url => url, :entries => entries)

        failures = batch_failures(entries, response)

        raise Errors::BatchChangeVisibilityError.new(failures) unless 
          failures.empty?

        nil

      end

      # @return [Boolean] Returns true if the other queue has the same
      #   url.
      def ==(other)
        other.kind_of?(Queue) and other.url == url
      end
      alias_method :eql?, :==

      # @private
      def inspect
        "<#{self.class}:#{url}>"
      end

      protected
      def batch_failures entries, response
        response.failed.inject([]) do |failures, failure|

          entry = entries.find{|e| e[:id] == failure.id }

          details = {
            :error_code => failure.code,
            :error_message => failure.message,
            :sender_fault => failure.sender_fault?,
          }

          if handle = entry[:receipt_handle]
            details[:receipt_handle] = handle
          end

          failures << details

        end
      end

      # @private
      protected
      def hit_timeout?(got_first, last_message_at, opts)
        initial_timeout = opts[:initial_timeout]
        idle_timeout = opts[:idle_timeout]

        timeout = (got_first ||
                   # if initial_timeout is false (as opposed
                   # to nil) then we skip the branch and poll
                   # indefinitely until the first message
                   # comes
                   (!initial_timeout && initial_timeout != false) ?
                   idle_timeout :
                   initial_timeout) and
          Time.now - last_message_at > timeout
      end

      # @private
      protected
      def receive_opts(opts)
        receive_opts = { :queue_url => url }
        receive_opts[:visibility_timeout] = opts[:visibility_timeout] if
          opts[:visibility_timeout]
        receive_opts[:max_number_of_messages] = opts[:limit] if
          opts[:limit]
        if names = opts[:attributes]
          receive_opts[:attribute_names] = names.map do |name|
            name = ReceivedMessage::ATTRIBUTE_ALIASES[name.to_sym] if
              ReceivedMessage::ATTRIBUTE_ALIASES.key?(name.to_sym)
            name = Core::Inflection.class_name(name.to_s) if name.kind_of?(Symbol)
            name
          end
        end
        receive_opts
      end

      # @private
      protected
      def call_message_block(messages, block)
        result = nil
        messages.each do |message|
          begin
            result = block.call(message)
          rescue Exception => e
            raise
          else
            message.delete
          end
        end
        result
      end

      # @private
      protected
      def get_attribute(name)
        resp = client.get_queue_attributes(:queue_url => url,
                                           :attribute_names =>
                                           [name, "QueueArn"].uniq)
        @arn ||= resp.attributes["QueueArn"]
        resp.attributes[name]
      end

      # @private
      protected
      def set_attribute(name, value)
        client.set_queue_attributes(:queue_url => url,
                                    :attribute => {
                                      :name => name,
                                      :value => value
                                    })
      end

    end

  end
end