Sha256: 7e771163d74b2471ed9e66f6852a9e3ab994dbdf2ddd5908a68a1b8e77b5af23

Contents?: true

Size: 1.12 KB

Versions: 3

Compression:

Stored size: 1.12 KB

Contents

module Lambdakiq
  class Queue

    attr_reader :queue_name,
                :queue_url

    def initialize(queue_name)
      @queue_name = queue_name
      @queue_url = get_queue_url
      attributes
    end

    def send_message(job, options = {})
      client.send_message send_message_params(job, options)
    end

    def attributes
      @attributes ||= client.get_queue_attributes({
        queue_url: queue_url,
        attribute_names: ['All']
      }).attributes
    end

    def redrive_policy
      @redrive_policy ||= attributes['RedrivePolicy'] ? JSON.parse(attributes['RedrivePolicy']) : nil
    end

    def max_receive_count
      redrive_policy&.dig('maxReceiveCount')&.to_i || 1
    end

    def fifo?
      queue_name.ends_with?('.fifo')
    end

    private

    def client
      Lambdakiq.client.sqs
    end

    def get_queue_url
      client.get_queue_url(queue_name: queue_name).queue_url
    end

    def send_message_params(job, options)
      { queue_url: queue_url }.merge(message_params(job, options))
    end

    def message_params(job, options)
      Message.new(self, job, options).params
    end

  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
lambdakiq-2.3.0 lib/lambdakiq/queue.rb
lambdakiq-2.2.0 lib/lambdakiq/queue.rb
lambdakiq-2.1.0 lib/lambdakiq/queue.rb