Sha256: 8aaff1b07abf7c77401e1a12607e1bebff6b958fa5bd2814f7bf38d693cf9872
Contents?: true
Size: 1.39 KB
Versions: 2
Compression:
Stored size: 1.39 KB
Contents
module Karafka # Karafka consuming server class class Server class << self # We need to store reference to all the consumers in the main server thread, # So we can have access to them later on and be able to stop them on exit attr_reader :consumers # Method which runs app def run @consumers = Concurrent::Array.new bind_on_sigint bind_on_sigquit start_supervised end private # @return [Karafka::Process] process wrapper instance used to catch system signal calls def process Karafka::Process.instance end # What should happen when we decide to quit with sigint def bind_on_sigint process.on_sigint do Karafka::App.stop! consumers.map(&:stop) exit end end # What should happen when we decide to quit with sigquit def bind_on_sigquit process.on_sigquit do Karafka::App.stop! consumers.map(&:stop) exit end end # Starts Karafka with a supervision # @note We don't need to sleep because Karafka::Fetcher is locking and waiting to # finish loop (and it won't happen until we explicitily want to stop) def start_supervised process.supervise do Karafka::App.run! Karafka::Fetcher.new.fetch_loop end end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
karafka-0.5.0.2 | lib/karafka/server.rb |
karafka-0.5.0.1 | lib/karafka/server.rb |