Sha256: 93ed266e2d50975dc466ed63a8001ce04fa74a47091c59dcc62a148f0bc7b691

Contents?: true

Size: 1.17 KB

Versions: 2

Compression:

Stored size: 1.17 KB

Contents

module Propono
  class QueueListener

    include Sqs

    def self.listen(topic_id, &message_processor)
      new(topic_id, &message_processor).listen
    end

    def initialize(topic_id, &message_processor)
      @topic_id = topic_id
      @message_processor = message_processor
    end

    def listen
      loop do
        unless read_messages
          sleep 10
        end
      end
    end

    private

    def read_messages
      response = sqs.receive_message( queue_url, {'MaxNumberOfMessages' => 10} )
      messages = response.body['Message']
      if messages.empty?
        false
      else
        messages.each { |msg| process_sqs_message(msg) }
      end
    rescue
      Propono.config.logger.error "Unexpected error reading from queue #{queue_url}"
      Propono.config.logger.error $!
    end

    def process_sqs_message(sqs_message)
      message = JSON.parse(sqs_message["Body"])["Message"]
      @message_processor.call(message)
      sqs.delete_message(queue_url, sqs_message['ReceiptHandle'])
    end

    def queue_url
      @queue_url ||= subscription.queue.url
    end

    def subscription
      @subscription ||= QueueSubscription.create(@topic_id)
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
propono-0.8.2 lib/propono/services/queue_listener.rb
propono-0.8.0 lib/propono/services/queue_listener.rb