Sha256: 303692ee9085ff1bcd6c049aa49ac1a9cd5f148c6b51c8656f7882c4b5d77ba4

Contents?: true

Size: 1.44 KB

Versions: 1

Compression:

Stored size: 1.44 KB

Contents

module Karafka
  # Karafka consuming server class
  class Server
    class << self
      # We need to store reference to all the consumers in the main server thread,
      # So we can have access to them later on and be able to stop them on exit
      attr_reader :consumers

      # Method which runs app
      def run
        @consumers = Concurrent::Array.new
        bind_on_sigint
        bind_on_sigquit
        start_supervised
      end

      private

      # @return [Karafka::Process] process wrapper instance used to catch system signal calls
      def process
        Karafka::Process.instance
      end

      # What should happen when we decide to quit with sigint
      def bind_on_sigint
        process.on_sigint do
          Karafka::App.stop!
          consumers.map(&:stop) if Karafka::App.running?
          exit
        end
      end

      # What should happen when we decide to quit with sigquit
      def bind_on_sigquit
        process.on_sigquit do
          Karafka::App.stop!
          consumers.map(&:stop) if Karafka::App.running?
          exit
        end
      end

      # Starts Karafka with a supervision
      # @note We don't need to sleep because Karafka::Fetcher is locking and waiting to
      # finish loop (and it won't happen until we explicitily want to stop)
      def start_supervised
        process.supervise do
          Karafka::App.run!
          Karafka::Fetcher.new.fetch_loop
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
karafka-0.5.0 lib/karafka/server.rb