Sha256: 293abf42e4236a7a5d8f767c19da649d596c2011811be62b0de72f667df09b3a

Contents?: true

Size: 911 Bytes

Versions: 1

Compression:

Stored size: 911 Bytes

Contents

module Propono
  class QueueListener

    include Sqs

    def self.listen(queue_url, &block)
      new(queue_url, &block).listen
    end

    def initialize(queue_url, &block)
      @queue_url = queue_url
      @block = block
    end

    def listen
      loop {
        sleep 10 unless read_messages
      }
    end

    private

    def read_messages
      begin
        response = sqs.receive_message( @queue_url, options = { 'MaxNumberOfMessages' => 10 } )
        messages = response.body['Message']
        if messages.empty?
          false
        else
          process_messages(messages)
        end
      rescue
        config.logger.puts "Unexpected error reading from queue #{@queue_url}"
      end
    end

    def process_messages(messages)
      messages.each do |message|
        @block.call(message)
        sqs.delete_message(message['ReceiptHandle'])
      end
      true
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
propono-0.3.0 lib/propono/services/queue_listener.rb