Sha256: a4307c318d9ef1b8d9dfe9f8a2af769293231ae5306a8a13498f4e09f60a6d87

Contents?: true

Size: 1.1 KB

Versions: 7

Compression:

Stored size: 1.1 KB

Contents

module Propono
  class QueueListener

    include Sqs

    def self.listen(topic_id, &message_processor)
      new(topic_id, &message_processor).listen
    end

    def initialize(topic_id, &message_processor)
      @topic_id = topic_id
      @message_processor = message_processor
    end

    def listen
      loop do
        unless read_messages
          sleep 10
        end
      end
    end

    private

    def read_messages
      response = sqs.receive_message( queue_url, options = { 'MaxNumberOfMessages' => 10 } )
      messages = response.body['Message']
      if messages.empty?
        false
      else
        process_messages(messages)
      end
    rescue
      config.logger.puts "Unexpected error reading from queue #{queue_url}"
    end

    def process_messages(messages)
      messages.each do |message|
        @message_processor.call(message)
        sqs.delete_message(message['ReceiptHandle'])
      end
      true
    end

    def queue_url
      @queue_url ||= subscription.queue.url
    end

    def subscription
      @subscription ||= QueueSubscription.create(@topic_id)
    end
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
propono-0.5.5 lib/propono/services/queue_listener.rb
propono-0.5.4 lib/propono/services/queue_listener.rb
propono-0.5.3 lib/propono/services/queue_listener.rb
propono-0.5.2 lib/propono/services/queue_listener.rb
propono-0.5.1 lib/propono/services/queue_listener.rb
propono-0.5.0 lib/propono/services/queue_listener.rb
propono-0.4.0 lib/propono/services/queue_listener.rb