Sha256: f377d9f6ca7f6a81c2f15d710637816aa50f8c3df32e1ee1fbbcc76483ed0ee4

Contents?: true

Size: 1.74 KB

Versions: 4

Compression:

Stored size: 1.74 KB

Contents

module Telegraph
  class Wire
    include Logging

    attr_reader :stream

    def self.connect(host, port)
      wire = new TCPSocket.new(host, port)
      return wire unless block_given?
      begin
        yield wire
      ensure
        wire.close
      end
    end

    def initialize(stream)
      @sequence = AckSequence.new
      @stream = stream
    end

    def close
      if @stream.closed?
        debug { "stream already closed" }
      else
        debug { "closing stream #{@stream.inspect}" }
        @stream.close
      end
    end

    def closed?
      @stream.closed?
    end

    def send_message(body, options = {})
      sequence_ack = options[:ack] ? options[:ack].sequence_number : nil
      message = Message.new(body, @sequence.next, sequence_ack)
      message.write stream
      unacked_sequences_numbers[message.sequence_number] = message if options[:need_ack]
    rescue IOError, Errno::EPIPE, Errno::ECONNRESET => e
      close rescue nil
      raise LineDead, e.message
    end

    def process_messages(options = {:timeout => 0})
      yield next_message(options) while true
    rescue NoMessageAvailable
      retry
    end

    def next_message(options = {:timeout => 0})
      begin
        raise NoMessageAvailable unless IO.select [@stream], nil, nil, options[:timeout]
        message = Message.read(@stream)
        unacked_sequences_numbers.delete message.sequence_ack if message.sequence_ack
        return message
      rescue IOError, Errno::ECONNRESET => e
        raise LineDead, e.message
      end
    rescue LineDead
      close rescue nil
      raise
    end

    def unacked_sequences_numbers
      @unacked_sequences_numbers ||= {}
    end

    def unacked_messages
      unacked_sequences_numbers.values
    end
  end
end

Version data entries

4 entries across 4 versions & 2 rubygems

Version Path
deep_test_pre-2.0 lib/telegraph/wire.rb
jstorimer-deep-test-2.0.0 lib/telegraph/wire.rb
jstorimer-deep-test-0.2.0 lib/telegraph/wire.rb
jstorimer-deep-test-0.1.0 lib/telegraph/wire.rb