Sha256: e99608aa00fd666dc623d2d44277ddd1cdba96c6a2d4bcf4277288f00fc6bbc4

Contents?: true

Size: 1.52 KB

Versions: 6

Compression:

Stored size: 1.52 KB

Contents

require 'socket'

class NiceSocket
  attr_reader :inbox

  def initialize(tcp_socket=nil)
    @tcp_socket = tcp_socket
    @outbox = Queue.new

    @inbox = Queue.new
    @on_recv = lambda { |msg| @inbox << msg } # by default, stick the messages in the inbox

    @closing = false

    start_sender
    start_receiver
  end

  def send(msg)
    @outbox << msg.to_s
    msg
  end

  def recv
    @inbox.pop
  end

  def self.connect(domain, port)
    new(TCPSocket.new(domain, port))
  end

  private

  class BadMessage < StandardError ; end
  class BadHeader < StandardError ; end

  def start_receiver
    @receiver_thread = Thread.new {
      loop do
        begin
          header = @tcp_socket.recv(4)
          raise BadHeader if header.size != 4

          expected_length = header.unpack('N')[0]
          message = @tcp_socket.recv(expected_length)
          raise BadMessage if message.size != expected_length

          puts ">>> #{message.inspect}"

          last_seen = Time.now.to_i

          @inbox << message
        rescue BadMessage, BadHeader => e
          @tcp_socket.close if @tcp_socket.open?
          break
        end
      end
    }
  end

  def start_sender
    @sender_thread = Thread.new {
      loop do
        msg = @outbox.pop
        begin
          puts "<<< #{msg.inspect}"
          @tcp_socket.send([msg.size].pack('N'), 0)
          @tcp_socket.send(msg, 0)
        rescue Errno::ECONNRESET, Errno::EPIPE => e
          @tcp_socket.close if @tcp_socket.open?
          break
        end
      end
    }
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
bum-0.0.17 lib/nice_socket.rb
bum-0.0.16 lib/nice_socket.rb
bum-0.0.15 lib/nice_socket.rb
bum-0.0.14 lib/nice_socket.rb
bum-0.0.13 lib/nice_socket.rb
bum-0.0.12 lib/nice_socket.rb