Sha256: 77a836193734a036c2c9b08cc9d8d7b65b54b093fec4fdbbd98f918cec532e3d
Contents?: true
Size: 1.92 KB
Versions: 5
Compression:
Stored size: 1.92 KB
Contents
module Eventboss # LongPoller fetches messages from SQS using Long Polling # http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html # It starts one thread per queue (handled by Launcher) class LongPoller include Logging include SafeThread TIME_WAIT = 10 attr_reader :id, :queue, :listener def initialize(launcher, bus, client, queue, listener) @id = "poller-#{queue.name}" @launcher = launcher @bus = bus @client = client @queue = queue @listener = listener @thread = nil @stop = false end def start @thread = safe_thread(id, &method(:run)) end def terminate(wait = false) @stop = true return unless @thread @thread.value if wait end def kill(wait = false) @stop = true return unless @thread @thread.value if wait # Force shutdown of poller, in case the loop is stuck @thread.raise Eventboss::Shutdown @thread.value if wait end def fetch_and_dispatch fetch_messages.each do |message| logger.debug(id) { "enqueueing message #{message.message_id}" } @bus << UnitOfWork.new(queue, listener, message) end end def run fetch_and_dispatch until @stop @launcher.poller_stopped(self) rescue Eventboss::Shutdown @launcher.poller_stopped(self) rescue StandardError => exception handle_exception(exception, poller_id: id) # Give a chance for temporary AWS errors to be resolved # Sleep guarantees against repeating fast failure errors sleep TIME_WAIT @launcher.poller_stopped(self, restart: @stop == false) end private def fetch_messages logger.debug(id) { 'fetching messages' } @client.receive_message( queue_url: queue.url, max_number_of_messages: 10, wait_time_seconds: TIME_WAIT ).messages end end end
Version data entries
5 entries across 5 versions & 1 rubygems