Sha256: 77a836193734a036c2c9b08cc9d8d7b65b54b093fec4fdbbd98f918cec532e3d

Contents?: true

Size: 1.92 KB

Versions: 5

Compression:

Stored size: 1.92 KB

Contents

module Eventboss
  # LongPoller fetches messages from SQS using Long Polling
  # http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html
  # It starts one thread per queue (handled by Launcher)
  class LongPoller
    include Logging
    include SafeThread

    TIME_WAIT = 10

    attr_reader :id, :queue, :listener

    def initialize(launcher, bus, client, queue, listener)
      @id = "poller-#{queue.name}"
      @launcher = launcher
      @bus = bus
      @client = client
      @queue = queue
      @listener = listener
      @thread = nil
      @stop = false
    end

    def start
      @thread = safe_thread(id, &method(:run))
    end

    def terminate(wait = false)
      @stop = true
      return unless @thread
      @thread.value if wait
    end

    def kill(wait = false)
      @stop = true
      return unless @thread
      @thread.value if wait

      # Force shutdown of poller, in case the loop is stuck
      @thread.raise Eventboss::Shutdown
      @thread.value if wait
    end

    def fetch_and_dispatch
      fetch_messages.each do |message|
        logger.debug(id) { "enqueueing message #{message.message_id}" }
        @bus << UnitOfWork.new(queue, listener, message)
      end
    end

    def run
      fetch_and_dispatch until @stop
      @launcher.poller_stopped(self)
    rescue Eventboss::Shutdown
      @launcher.poller_stopped(self)
    rescue StandardError => exception
      handle_exception(exception, poller_id: id)
      # Give a chance for temporary AWS errors to be resolved
      # Sleep guarantees against repeating fast failure errors
      sleep TIME_WAIT
      @launcher.poller_stopped(self, restart: @stop == false)
    end

    private

    def fetch_messages
      logger.debug(id) { 'fetching messages' }
      @client.receive_message(
        queue_url: queue.url,
        max_number_of_messages: 10,
        wait_time_seconds: TIME_WAIT
      ).messages
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
eventboss-1.3.1 lib/eventboss/long_poller.rb
eventboss-1.3.0 lib/eventboss/long_poller.rb
eventboss-1.2.1 lib/eventboss/long_poller.rb
eventboss-1.2.0 lib/eventboss/long_poller.rb
eventboss-1.1.3 lib/eventboss/long_poller.rb