lib/boom_nats/application.rb in boom_nats-0.1.0 vs lib/boom_nats/application.rb in boom_nats-0.1.1

- old
+ new

@@ -2,22 +2,21 @@ require "concurrent-edge" module BoomNats class Application attr_accessor :router, :nats_options - attr_reader :route_topics, :nats + attr_reader :route_topics def initialize @route_topics = [] @subscriptions = [] @mutex = Mutex.new end def servers(value) stop - - @nats = nats_connect(value) + @server = value end def draw_routes(&block) raise Error, "required block given" unless block_given? @@ -27,34 +26,42 @@ def setup(&block) instance_eval(&block) if block_given? end + def nats + NATS + end + def start - @route_topics.each do |rt| - @subscriptions << @nats.subscribe(rt.topic, rt.options) do |msg, reply, _sub| - rt.executor.new(msg, reply, @nats, rt.serializer, rt.parser) - end - end + Thread.new do + nats_connect do |nats| + @route_topics.each do |rt| + @subscriptions << nats.subscribe(rt.topic, rt.options) do |msg, reply, _sub| + rt.executor.new(msg, reply, nats, rt.serializer, rt.parser) + end + end - BoomNats.logger.debug "BoomNats::started" + BoomNats.logger.debug "BoomNats::started" - return if defined?(Rails::Railtie) + return if defined?(Rails::Railtie) - prepare_trap + prepare_trap + end + end wait end def stop - @subscriptions.each { |s| @nats.unsubscribe(s) } + @subscriptions.each { |s| nats.unsubscribe(s) } @subscriptions = [] # disconnect from old server if already configured - if @nats&.connected? - @nats.drain do - @nats.stop + if nats&.connected? + nats.drain do + nats.stop end end end def kill @@ -66,24 +73,36 @@ exit unless defined?(Rspec) end end end - protected - - def nats_connect(servers) - ch = Concurrent::Channel.new + def execute(&block) + timeout = Concurrent::Cancellation.timeout 5 + done = Concurrent::Channel.new(capacity: 1) Concurrent::Channel.go do - # Connect to NATS service - NATS.start({ - servers: servers, - **(nats_options.is_a?(Hash) ? nats_options : {}) - }) do |nc| - ch.put nc + loop do + @mutex.synchronize do + done << true if nats.connected? + end + + done << false if timeout.origin.resolved? end end - ch.take + if ~done + block.call(nats) + else + raise "Nats do not connected", BoomNats::Error + end + end + + protected + + def nats_connect(&block) + NATS.start({ + servers: @server, + **(nats_options.is_a?(Hash) ? nats_options : {}) + }, &block) end def prepare_trap %w[INT TERM].each do |s| trap(s) { kill }