Sha256: e6be99826cad5e5f175f332b4a0a8d9ad5905d7b20524fdadbcb9618f50d2f13

Contents?: true

Size: 1.26 KB

Versions: 4

Compression:

Stored size: 1.26 KB

Contents

module Propono
  class QueueListener

    include Sqs

    def self.listen(topic_id, &message_processor)
      new(topic_id, &message_processor).listen
    end

    def initialize(topic_id, &message_processor)
      @topic_id = topic_id
      @message_processor = message_processor
    end

    def listen
      loop do
        unless read_messages
          sleep 10
        end
      end
    end

    private

    def read_messages
      #response = sqs.receive_message( queue_url, options = { 'MaxNumberOfMessages' => 10 } )
      response = sqs.receive_message( queue_url, {'MaxNumberOfMessages' => 10} )
      messages = response.body['Message']
      if messages.empty?
        false
      else
        messages.each { |msg| process_sqs_message(msg) }
      end
    rescue
      Propono.config.logger.error "Unexpected error reading from queue #{queue_url}"
      Propono.config.logger.error $!
    end

    def process_sqs_message(sqs_message)
      message = JSON.parse(sqs_message["Body"])["Message"]
      @message_processor.call(message)
      sqs.delete_message(queue_url, sqs_message['ReceiptHandle'])
    end

    def queue_url
      @queue_url ||= subscription.queue.url
    end

    def subscription
      @subscription ||= QueueSubscription.create(@topic_id)
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
propono-0.7.0 lib/propono/services/queue_listener.rb
propono-0.6.3 lib/propono/services/queue_listener.rb
propono-0.6.1 lib/propono/services/queue_listener.rb
propono-0.6.0 lib/propono/services/queue_listener.rb