Sha256: 5f042d3637a8d4e59a59b0a970b9c35930e2cb7bf7320ecb0e4adb6734b82bd8

Contents?: true

Size: 1.95 KB

Versions: 1

Compression:

Stored size: 1.95 KB

Contents

module PatronusFati
  PatronusFatiError = Class.new(StandardError)
  LostConnection    = Class.new(PatronusFatiError)
  ConnectionTimeout = Class.new(PatronusFatiError)
  UnableToRead      = Class.new(PatronusFatiError)
  UnableToWrite     = Class.new(PatronusFatiError)

  class Connection
    attr_reader :port, :read_queue, :server, :write_queue

    def initialize(server, port)
      @server = server
      @port = port

      self.read_queue = Queue.new
      self.write_queue = Queue.new
    end

    def connect
      establish_connection

      start_read_thread
      start_write_thread
    end

    def connected?
      !socket.nil?
    end

    def disconnect
      return unless connected?

      Thread.kill(read_thread)
      Thread.kill(write_thread)

      socket.close unless socket.closed?

      self.socket = nil
    end

    def read
      read_queue.pop
    end

    def write(msg)
      write_queue.push(msg)
    end

    protected

    attr_accessor :read_thread, :socket, :write_thread
    attr_writer :read_queue, :write_queue

    def establish_connection
      return if connected?
      @socket = TCPSocket.new(server, port)
    end

    def start_read_thread
      self.read_thread = Thread.new do
        begin
          while (line = socket.readline)
            read_queue << line
          end
        rescue Timeout::Error => e
          socket.close
          raise ConnectionTimeout, e.message
        rescue EOFError => e
          socket.close
          raise LostConnection, e.message
        rescue => e
          socket.close
          raise UnableToRead, e.message
        end
      end
    end

    def start_write_thread
      self.write_thread = Thread.new do
        begin
          count = 0
          while (msg = write_queue.pop)
            socket.write("!%i %s\r\n" % [count, msg])
            count += 1
          end
        rescue => e
          socket.close
          raise UnableToWrite, e.message
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
patronus_fati-0.8.12 lib/patronus_fati/connection.rb