Sha256: 9ab42b25b6ca2fe8ab4fe0289c1c5b68d6d9624b588cc45bda9b5fc9d8f7d8fc
Contents?: true
Size: 1.88 KB
Versions: 1
Compression:
Stored size: 1.88 KB
Contents
# This class takes care of the subscribing side of the messaging system. module DispatchRider class Subscriber attr_reader :queue_service_registrar, :dispatcher, :demultiplexer def initialize @queue_service_registrar = DispatchRider::Registrars::QueueService.new @dispatcher = DispatchRider::Dispatcher.new end def register_queue(name, options = {}) queue_service_registrar.register(name, options) self end def register_handler(name) dispatcher.register(name) self end def register_handlers(*names) names.each {|name| register_handler(name)} self end def setup_demultiplexer(queue_name, error_handler = DispatchRider::DefaultErrorHandler) queue = queue_service_registrar.fetch(queue_name) @demultiplexer ||= DispatchRider::Demultiplexer.new(queue, dispatcher, error_handler) self end def process register_quit_trap register_term_trap register_int_trap demultiplexer.start end private def register_quit_trap Signal.trap("QUIT") do # signal number: 3 logger.info "Received SIGQUIT, stopping demultiplexer" demultiplexer.stop("Got SIGQUIT") end end def register_term_trap Signal.trap("TERM") do # signal number: 15 logger.info "Received SIGTERM, stopping demultiplexer" demultiplexer.stop("Got SIGTERM") end end def register_int_trap @already_interrupted = false Signal.trap("INT") do if @already_interrupted logger.info "Received SIGINT second time, aborting" exit(0) else logger.info "Received SIGINT first time, stopping demultiplexer" demultiplexer.stop("Got SIGINT") end @already_interrupted = true end end def logger DispatchRider.config.logger end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
dispatch-rider-1.3.1 | lib/dispatch-rider/subscriber.rb |