Sha256: 602e70bf54e16a22023521b653ef75f0c2b72438762878cee91d08ec4b7499e7

Contents?: true

Size: 1.97 KB

Versions: 11

Compression:

Stored size: 1.97 KB

Contents

module GELF
  module Transport
    class TCP
      attr_reader :addresses

      # `addresses` Array of [host, port] pairs
      def initialize(addresses)
        @sockets = []
        self.addresses = addresses
      end

      def addresses=(addresses)
        @addresses = addresses.dup.freeze.tap do |addrs|
          @sockets.each(&:close)
          @sockets = addrs.map { |peer| connect(*peer) }
        end
      end

      def send(message)
        return if @addresses.empty?
        loop do
          connected = @sockets.reject(&:closed?)
          reconnect_all if connected.empty?
          break if write_any(connected, message)
        end
      end

      private

      def connect(host, port)
        socket_class.new(host, port)
      end

      def reconnect_all
        @sockets = @sockets.each_with_index.map do |old_socket, index|
          old_socket.closed? ? connect(*@addresses[index]) : old_socket
        end
      end

      def socket_class
        if defined?(Celluloid::IO::TCPSocket)
          Celluloid::IO::TCPSocket
        else
          ::TCPSocket
        end
      end

      def write_any(sockets, message)
        sockets.shuffle.each do |socket|
          return true if write_socket(socket, message)
        end
        false
      end

      def write_socket(socket, message)
        unsafe_write_socket(socket, message)
      rescue IOError, SystemCallError
        socket.close unless socket.closed?
        false
      end

      def unsafe_write_socket(socket, message)
        r,w = IO.select([socket], [socket])
        # Read everything first
        while r.any? do
          # don't expect any reads, but a readable socket might
          # mean the remote end closed, so read it and throw it away.
          # we'll get an EOFError if it happens.
          socket.sysread(16384)
          r = IO.select([socket])
        end

        # Now send the payload
        return false unless w.any?
        return socket.syswrite(message) > 0
      end
    end
  end
end

Version data entries

11 entries across 11 versions & 3 rubygems

Version Path
gelf_redux-4.1.0 lib/gelf/transport/tcp.rb
gelf_redux-4.0.1 lib/gelf/transport/tcp.rb
gelf_redux-4.0.0 lib/gelf/transport/tcp.rb
gelf_redux-3.2.2 lib/gelf/transport/tcp.rb
gelf_redux-3.1.2 lib/gelf/transport/tcp.rb
gelf_redux-3.2.1 lib/gelf/transport/tcp.rb
gelf_redux-3.1.1 lib/gelf/transport/tcp.rb
gelf.fitterpen-3.1.1 lib/gelf/transport/tcp.rb
gelf.fitterpen-3.1.0 lib/gelf/transport/tcp.rb
gelf-3.1.0 lib/gelf/transport/tcp.rb
gelf-3.1.0.pre.rc.1 lib/gelf/transport/tcp.rb