lib/boom_nats/application.rb in boom_nats-0.1.0 vs lib/boom_nats/application.rb in boom_nats-0.1.1
- old
+ new
@@ -2,22 +2,21 @@
require "concurrent-edge"
module BoomNats
class Application
attr_accessor :router, :nats_options
- attr_reader :route_topics, :nats
+ attr_reader :route_topics
def initialize
@route_topics = []
@subscriptions = []
@mutex = Mutex.new
end
def servers(value)
stop
-
- @nats = nats_connect(value)
+ @server = value
end
def draw_routes(&block)
raise Error, "required block given" unless block_given?
@@ -27,34 +26,42 @@
def setup(&block)
instance_eval(&block) if block_given?
end
+ def nats
+ NATS
+ end
+
def start
- @route_topics.each do |rt|
- @subscriptions << @nats.subscribe(rt.topic, rt.options) do |msg, reply, _sub|
- rt.executor.new(msg, reply, @nats, rt.serializer, rt.parser)
- end
- end
+ Thread.new do
+ nats_connect do |nats|
+ @route_topics.each do |rt|
+ @subscriptions << nats.subscribe(rt.topic, rt.options) do |msg, reply, _sub|
+ rt.executor.new(msg, reply, nats, rt.serializer, rt.parser)
+ end
+ end
- BoomNats.logger.debug "BoomNats::started"
+ BoomNats.logger.debug "BoomNats::started"
- return if defined?(Rails::Railtie)
+ return if defined?(Rails::Railtie)
- prepare_trap
+ prepare_trap
+ end
+ end
wait
end
def stop
- @subscriptions.each { |s| @nats.unsubscribe(s) }
+ @subscriptions.each { |s| nats.unsubscribe(s) }
@subscriptions = []
# disconnect from old server if already configured
- if @nats&.connected?
- @nats.drain do
- @nats.stop
+ if nats&.connected?
+ nats.drain do
+ nats.stop
end
end
end
def kill
@@ -66,24 +73,36 @@
exit unless defined?(Rspec)
end
end
end
- protected
-
- def nats_connect(servers)
- ch = Concurrent::Channel.new
+ def execute(&block)
+ timeout = Concurrent::Cancellation.timeout 5
+ done = Concurrent::Channel.new(capacity: 1)
Concurrent::Channel.go do
- # Connect to NATS service
- NATS.start({
- servers: servers,
- **(nats_options.is_a?(Hash) ? nats_options : {})
- }) do |nc|
- ch.put nc
+ loop do
+ @mutex.synchronize do
+ done << true if nats.connected?
+ end
+
+ done << false if timeout.origin.resolved?
end
end
- ch.take
+ if ~done
+ block.call(nats)
+ else
+ raise "Nats do not connected", BoomNats::Error
+ end
+ end
+
+ protected
+
+ def nats_connect(&block)
+ NATS.start({
+ servers: @server,
+ **(nats_options.is_a?(Hash) ? nats_options : {})
+ }, &block)
end
def prepare_trap
%w[INT TERM].each do |s|
trap(s) { kill }