Sha256: cfb650d6461efb1420f14ac616028f1634386d87b6aebb8e07fb712419b7cb7b

Contents?: true

Size: 1.9 KB

Versions: 27

Compression:

Stored size: 1.9 KB

Contents

# This class takes care of the subscribing side of the messaging system.
module DispatchRider
  class Subscriber
    attr_reader :queue_service_registrar, :dispatcher, :demultiplexer

    def initialize
      @queue_service_registrar = DispatchRider::Registrars::QueueService.new
      @dispatcher = DispatchRider::Dispatcher.new
    end

    def register_queue(name, options = {})
      queue_service_registrar.register(name, options)
      self
    end

    def register_handler(name)
      dispatcher.register(name)
      self
    end

    def register_handlers(*names)
      names.each {|name| register_handler(name)}
      self
    end

    def setup_demultiplexer(queue_name, error_handler = DispatchRider::DefaultErrorHandler)
      queue = queue_service_registrar.fetch(queue_name)
      @demultiplexer ||= DispatchRider::Demultiplexer.new(queue, dispatcher, error_handler)
      self
    end

    def process
      register_quit_trap
      register_term_trap
      register_int_trap

      demultiplexer.start
    end

    private

    def register_quit_trap
      Signal.trap("QUIT") do
        # signal number: 3
        logger.info "Received SIGQUIT, stopping demultiplexer"
        demultiplexer.stop(reason: "Got SIGQUIT")
      end
    end

    def register_term_trap
      Signal.trap("TERM") do
        # signal number: 15
        logger.info "Received SIGTERM, stopping demultiplexer"
        demultiplexer.stop(reason: "Got SIGTERM")
      end
    end

    def register_int_trap
      @already_interrupted = false
      Signal.trap("INT") do
        if @already_interrupted
          logger.info "Received SIGINT second time, aborting"
          exit(0)
        else
          logger.info "Received SIGINT first time, stopping demultiplexer"
          demultiplexer.stop(reason: "Got SIGINT")
        end
        @already_interrupted = true
      end
    end

    def logger
      DispatchRider.config.logger
    end
  end
end

Version data entries

27 entries across 27 versions & 1 rubygems

Version Path
dispatch-rider-2.1.0 lib/dispatch-rider/subscriber.rb
dispatch-rider-2.0.0 lib/dispatch-rider/subscriber.rb
dispatch-rider-1.9.0 lib/dispatch-rider/subscriber.rb
dispatch-rider-1.8.6 lib/dispatch-rider/subscriber.rb
dispatch-rider-1.8.5 lib/dispatch-rider/subscriber.rb
dispatch-rider-1.8.4 lib/dispatch-rider/subscriber.rb
dispatch-rider-1.8.3 lib/dispatch-rider/subscriber.rb
dispatch-rider-1.8.2 lib/dispatch-rider/subscriber.rb
dispatch-rider-1.8.1 lib/dispatch-rider/subscriber.rb
dispatch-rider-1.8.0 lib/dispatch-rider/subscriber.rb
dispatch-rider-1.7.2 lib/dispatch-rider/subscriber.rb
dispatch-rider-1.7.1 lib/dispatch-rider/subscriber.rb
dispatch-rider-1.7.0 lib/dispatch-rider/subscriber.rb
dispatch-rider-1.6.2 lib/dispatch-rider/subscriber.rb
dispatch-rider-1.6.1 lib/dispatch-rider/subscriber.rb
dispatch-rider-1.6.0 lib/dispatch-rider/subscriber.rb
dispatch-rider-1.5.3 lib/dispatch-rider/subscriber.rb
dispatch-rider-1.5.2 lib/dispatch-rider/subscriber.rb
dispatch-rider-1.5.1 lib/dispatch-rider/subscriber.rb
dispatch-rider-1.5.0 lib/dispatch-rider/subscriber.rb