Sha256: 762cb62fdb1e25c8290ac4d15ca8634d2c29f692ff64af87a33c2414ec2e4d70

Contents?: true

Size: 1.98 KB

Versions: 2

Compression:

Stored size: 1.98 KB

Contents

module Impala
  class Connection
    SLEEP_INTERVAL = 0.1

    def initialize(host='localhost', port=21000)
      @host = host
      @port = port
      @connected = false
      open
    end

    def open
      return if @connected

      socket = Thrift::Socket.new(@host, @port)

      @transport = Thrift::BufferedTransport.new(socket)
      @transport.open

      proto = Thrift::BinaryProtocol.new(@transport)
      @service = Protocol::ImpalaService::Client.new(proto)
      @connected = true
    end

    def close
      @transport.close
      @connected = false
    end

    def open?
      @connected
    end

    def query(raw_query)
      execute(raw_query).fetch_all
    end

    def execute(raw_query)
      raise ConnectionError.new("Connection closed") unless open?

      words = raw_query.split
      if words.empty?
        raise InvalidQueryError.new("Empty query")
      elsif !KNOWN_COMMANDS.include?(words.first.downcase)
        raise InvalidQueryError.new("Unrecognized command: '#{words.first}'")
      end

      query = sanitize_query(raw_query)
      handle = send_query(query)

      wait_for_result(handle)
      Cursor.new(handle, @service)
    end

    private

    def sanitize_query(raw)
      #TODO?
      raw.downcase
    end

    def send_query(sanitized_query)
      query = Protocol::Beeswax::Query.new
      query.query = sanitized_query

      @service.query(query)
    end

    def wait_for_result(handle)
      begin
        #TODO select here, or something
        while true
          state = @service.get_state(handle)
          if state == Protocol::Beeswax::QueryState::FINISHED
            break
          elsif state == Protocol::Beeswax::QueryState::EXCEPTION
            close_handle(handle)
            raise ConnectionError.new("The query was aborted")
          end

          sleep(SLEEP_INTERVAL)
        end
      rescue Interrupt
        close_handle(handle)
        raise
      end
    end

    def close_handle(handle)
      @service.close(handle)
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
impala-0.1.1 lib/impala/connection.rb
impala-0.1.0 lib/impala/connection.rb