lib/kurchatov/responders/ndjson_transport.rb in kurchatov-0.4.6 vs lib/kurchatov/responders/ndjson_transport.rb in kurchatov-0.4.7

- old
+ new

@@ -2,58 +2,37 @@ always_start true ignore_errors true default[:host], default[:port] = Kurchatov::Config[:nsjson_transport].to_s.split(":") -default[:connect_timeout] = 1 -default[:send_timeout] = 1 -default[:exit_on_disconnect] = true run_if do !!plugin.host end helpers do - def if_error - if plugin.exit_on_disconnect - Log.error("Socket #{plugin.host}:#{plugin.port} (#{@socket.inspect}) write error, exit..") - exit 99 - end - nil - end - - def connect - Timeout::timeout(plugin.connect_timeout) { - @socket ||= (TCPSocket.new(plugin.host, plugin.port) rescue if_error) - } - end - def mutex @mutex ||= Mutex.new end - def with_connection - mutex.synchronize do - yield(@socket || connect) - end - end - def flush + @tcpsocket ||= TCPSocket.new(plugin.host, plugin.port.to_i) @events_to_send ||= events.to_flush if !@events_to_send.empty? @message = {"events" => @events_to_send}.to_json - Log.debug("Message: #{@message}") - with_connection do |socket| - Timeout::timeout(plugin.send_timeout) { - socket.puts(@message) - } rescue if_error - end + Log.info("Message: #{@message}") + mutex.synchronize { @tcpsocket.puts(@message) } end @events_to_send = nil end + def flush_or_exit + flush rescue (exit 99) + end + end run do - loop { flush; sleep 1 } + Log.info("Start with #{plugin.host}:#{plugin.port}") + loop { flush_or_exit; sleep 1 } end