Sha256: 9baf200a98d23b2f363b7d87ccf2a7403c1b24877ab0b83c79f84f79fd717b83
Contents?: true
Size: 1.91 KB
Versions: 10
Compression:
Stored size: 1.91 KB
Contents
module Eventboss # LongPoller fetches messages from SQS using Long Polling # http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html # It starts one thread per queue (handled by Launcher) class LongPoller include Logging include SafeThread TIME_WAIT = 10 attr_reader :id, :queue, :listener def initialize(launcher, bus, client, queue, listener) @id = "poller-#{queue.name}" @launcher = launcher @bus = bus @client = client @queue = queue @listener = listener @thread = nil @stop = false end def start @thread = safe_thread(id, &method(:run)) end def terminate(wait = false) @stop = true return unless @thread @thread.value if wait end def kill(wait = false) @stop = true return unless @thread @thread.value if wait # Force shutdown of poller, in case the loop is stuck @thread.raise Eventboss::Shutdown @thread.value if wait end def fetch_and_dispatch fetch_messages.each do |message| logger.debug("enqueueing message #{message.message_id}", id) @bus << UnitOfWork.new(queue, listener, message) end end def run fetch_and_dispatch until @stop @launcher.poller_stopped(self) rescue Eventboss::Shutdown @launcher.poller_stopped(self) rescue StandardError => exception handle_exception(exception, poller_id: id) # Give a chance for temporary AWS errors to be resolved # Sleep guarantees against repeating fast failure errors sleep TIME_WAIT @launcher.poller_stopped(self, restart: @stop == false) end private def fetch_messages logger.debug('fetching messages', id) @client.receive_message( queue_url: queue.url, max_number_of_messages: 10, wait_time_seconds: TIME_WAIT ).messages end end end
Version data entries
10 entries across 10 versions & 1 rubygems