lib/kurchatov/responders/ndjson_transport.rb in kurchatov-0.4.6 vs lib/kurchatov/responders/ndjson_transport.rb in kurchatov-0.4.7
- old
+ new
@@ -2,58 +2,37 @@
always_start true
ignore_errors true
default[:host], default[:port] = Kurchatov::Config[:nsjson_transport].to_s.split(":")
-default[:connect_timeout] = 1
-default[:send_timeout] = 1
-default[:exit_on_disconnect] = true
run_if do
!!plugin.host
end
helpers do
- def if_error
- if plugin.exit_on_disconnect
- Log.error("Socket #{plugin.host}:#{plugin.port} (#{@socket.inspect}) write error, exit..")
- exit 99
- end
- nil
- end
-
- def connect
- Timeout::timeout(plugin.connect_timeout) {
- @socket ||= (TCPSocket.new(plugin.host, plugin.port) rescue if_error)
- }
- end
-
def mutex
@mutex ||= Mutex.new
end
- def with_connection
- mutex.synchronize do
- yield(@socket || connect)
- end
- end
-
def flush
+ @tcpsocket ||= TCPSocket.new(plugin.host, plugin.port.to_i)
@events_to_send ||= events.to_flush
if !@events_to_send.empty?
@message = {"events" => @events_to_send}.to_json
- Log.debug("Message: #{@message}")
- with_connection do |socket|
- Timeout::timeout(plugin.send_timeout) {
- socket.puts(@message)
- } rescue if_error
- end
+ Log.info("Message: #{@message}")
+ mutex.synchronize { @tcpsocket.puts(@message) }
end
@events_to_send = nil
end
+ def flush_or_exit
+ flush rescue (exit 99)
+ end
+
end
run do
- loop { flush; sleep 1 }
+ Log.info("Start with #{plugin.host}:#{plugin.port}")
+ loop { flush_or_exit; sleep 1 }
end