Sha256: 81d27785ccb5a16cb6dfd933c029bc2fd268b3d595cfe98741f219e0114685c0

Contents?: true

Size: 1.16 KB

Versions: 5

Compression:

Stored size: 1.16 KB

Contents

name 'new line delimeter json transport'

always_start true
ignore_errors true

default[:host], default[:port] = Kurchatov::Config[:nsjson_transport].to_s.split(":")
default[:connect_timeout] = 1
default[:send_timeout] = 1
default[:exit_on_disconnect] = true

run_if do
  !!plugin.host
end

helpers do

  def if_error
    if plugin.exit_on_disconnect
      Log.error("Socket #{plugin.host}:#{plugin.port} (#{@socket.inspect}) write error, exit..")
      exit 99
    end
    nil
  end

  def connect
    Timeout::timeout(plugin.connect_timeout) {
      @socket ||= (TCPSocket.new(plugin.host, plugin.port) rescue if_error)
    }
  end

  def mutex
    @mutex ||= Mutex.new
  end

  def with_connection
    mutex.synchronize do
      yield(@socket || connect)
    end
  end

  def flush
    @events_to_send ||= events.to_flush
    if !@events_to_send.empty?
      @message = {"events" => @events_to_send}.to_json
      Log.debug("Message: #{@message}")
      with_connection do |socket|
        Timeout::timeout(plugin.send_timeout) {
          socket.puts(@message)
        } rescue if_error
      end
    end
    @events_to_send = nil
  end

end

run do
  loop { flush; sleep 1 }
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
kurchatov-0.4.6 lib/kurchatov/responders/ndjson_transport.rb
kurchatov-0.4.5 lib/kurchatov/responders/ndjson_transport.rb
kurchatov-0.4.4 lib/kurchatov/responders/ndjson_transport.rb
kurchatov-0.4.3 lib/kurchatov/responders/ndjson_transport.rb
kurchatov-0.4.2 lib/kurchatov/responders/ndjson_transport.rb