Sha256: 7cd400ca5b1848ff0a9570582b2001f7dee2ed2903bde99b187d6531969c5fdd

Contents?: true

Size: 1.93 KB

Versions: 1

Compression:

Stored size: 1.93 KB

Contents

module Impala
  class Connection
    SLEEP_INTERVAL = 0.1

    def initialize(host, port)
      @host = host
      @port = port
      @connected = false
      open
    end

    def open
      return if @connected

      socket = Thrift::Socket.new(@host, @port)

      @transport = Thrift::BufferedTransport.new(socket)
      @transport.open

      proto = Thrift::BinaryProtocol.new(@transport)
      @service = Protocol::ImpalaService::Client.new(proto)
      @connected = true
    end

    def close
      @transport.close
      @connected = false
    end

    def open?
      @connected
    end

    def query(raw_query)
      execute(raw_query).fetch_all
    end

    def execute(raw_query)
      raise ConnectionError.new("Connection closed") unless open?

      query = sanitize_query(raw_query)
      handle = send_query(query)

      wait_for_result(handle)
      Cursor.new(handle, @service)
    end

    private

    def sanitize_query(raw_query)
      words = raw_query.split
      raise InvalidQueryError.new("Empty query") if words.empty?

      command = words.first.downcase
      if !KNOWN_COMMANDS.include?(command)
        raise InvalidQueryError.new("Unrecognized command: '#{words.first}'")
      end

      ([command] + words[1..-1]).join(' ')
    end

    def send_query(sanitized_query)
      query = Protocol::Beeswax::Query.new
      query.query = sanitized_query

      @service.query(query)
    end

    def wait_for_result(handle)
      #TODO select here, or something
      while true
        state = @service.get_state(handle)
        if state == Protocol::Beeswax::QueryState::FINISHED
          break
        elsif state == Protocol::Beeswax::QueryState::EXCEPTION
          close_handle(handle)
          raise ConnectionError.new("The query was aborted")
        end

        sleep(SLEEP_INTERVAL)
      end
    rescue
      close_handle(handle)
      raise
    end

    def close_handle(handle)
      @service.close(handle)
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
impala-0.1.2 lib/impala/connection.rb