Sha256: b0902f4944f6ca517bfefff265d4e54450455c045d0dd5d2d74c1963ad695c3b

Contents?: true

Size: 1.33 KB

Versions: 2

Compression:

Stored size: 1.33 KB

Contents

module Shoryuken
  class Queue
    attr_accessor :name, :client, :url

    def initialize(client, name)
      self.name   = name
      self.client = client
      self.url    = client.get_queue_url(queue_name: name).queue_url
    end

    def visibility_timeout
      client.get_queue_attributes(
        queue_url: url,
        attribute_names: ['VisibilityTimeout']
      ).attributes['VisibilityTimeout'].to_i
    end

    def delete_messages(options)
      client.delete_message_batch(options.merge(queue_url: url))
    end

    def send_message(options)
      client.send_message(sanitize_message_body(options.merge(queue_url: url)))
    end

    def send_messages(options)
      client.send_message_batch(sanitize_message_body(options.merge(queue_url: url)))
    end

    def receive_messages(options)
      client.receive_message(options.merge(queue_url: url)).
        messages.
        map { |m| Message.new(client, url, m) }
    end

    private

    def sanitize_message_body(options)
      messages = options[:entries] || [options]

      messages.each do |m|
        body = m[:message_body]
        if body.is_a?(Hash)
          m[:message_body] = JSON.dump(body)
        elsif !body.is_a? String
          fail ArgumentError, "The message body must be a String and you passed a #{body.class}"
        end
      end

      options
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
shoryuken-1.0.2 lib/shoryuken/queue.rb
shoryuken-1.0.1 lib/shoryuken/queue.rb