Sha256: caca145aaba33937a1f5db3cb40fae2cd29b0d1aa3aeebf7711f5a1e57d38a01

Contents?: true

Size: 1.44 KB

Versions: 1

Compression:

Stored size: 1.44 KB

Contents

module GELF
  module Transport
    class TCP
      attr_reader :addresses

      # `addresses` Array of [host, port] pairs
      def initialize(addresses)
        @sockets = []
        self.addresses = addresses
      end

      def addresses=(addresses)
        @addresses = addresses.dup.freeze.tap do |addrs|
          @sockets.each(&:close)
          @sockets = addrs.map { |peer| connect(*peer) }
        end
      end

      def send(message)
        return if @addresses.empty?
        loop do
          connected = @sockets.reject(&:closed?)
          reconnect_all if connected.empty?
          break if write_any(connected, message)
        end
      end

      private

      def connect(host, port)
        socket_class.new(host, port)
      end

      def reconnect_all
        @sockets = @sockets.each_with_index.map do |old_socket, index|
          old_socket.closed? ? connect(*@addresses[index]) : old_socket
        end
      end

      def socket_class
        if defined?(Celluloid::IO::TCPSocket)
          Celluloid::IO::TCPSocket
        else
          ::TCPSocket
        end
      end

      def write_any(sockets, message)
        sockets.shuffle.each do |socket|
          return true if write_socket(socket, message)
        end
        false
      end

      def write_socket(socket, message)
        socket.write(message) > 0
      rescue IOError, SystemCallError
        socket.close unless socket.closed?
        false
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
gelf-3.0.0 lib/gelf/transport/tcp.rb