Sha256: 4ebe104d5102d24a4d1bdb5d6308be50d7b04d9ec98bc593185c375780907d60

Contents?: true

Size: 1.96 KB

Versions: 1

Compression:

Stored size: 1.96 KB

Contents

module Impala
  class Cursor
    include Enumerable

    def initialize(handle, service, buffer_length=1024)
      @handle = handle
      @service = service
      @metadata = @service.get_results_metadata(@handle)

      @buffer_length = buffer_length
      @row_buffer = []

      @done = false
      @closed = false
    end

    def each
      while row = fetch_row
        yield row
      end
    end

    def fetch_row
      raise CursorError.new("Cursor has expired or been closed") if @closed

      if @row_buffer.empty?
        if @done
          return nil
        else
          fetch_more
        end
      end

      @row_buffer.shift
    end

    def fetch_all
      self.to_a
    end

    def close
      @closed = true
      @service.close(@handle)
    end

    private

    def fetch_more
      return if @done

      begin
        res = @service.fetch(@handle, false, @buffer_length)
      rescue Protocol::Beeswax::BeeswaxException => e
        @closed = true
        raise CursorError.new("Cursor has expired or been closed")
      end

      rows = res.data.map { |raw| parse_row(raw) }
      @row_buffer.concat(rows)
      @done = true unless res.has_more
    end

    def parse_row(raw)
      row = {}
      fields = raw.split(@metadata.delim)

      fields.zip(@metadata.schema.fieldSchemas).each do |raw_value, schema|
        value = convert_raw_value(raw_value, schema)
        row[schema.name.to_sym] = value
      end

      row
    end

    def convert_raw_value(value, schema)
      return nil if value == 'NULL'

      case schema.type
      when 'string'
        value
      when 'boolean'
        if value == 'true'
          true
        elsif value == 'false'
          false
        else
          raise ParsingError.new("Invalid value for boolean: #{value}")
        end
      when 'tinyint', 'int', 'bigint'
        value.to_i
      when 'double'
        value.to_f
      else
        raise ParsingError.new("Unknown type: #{schema.type}")
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
impala-0.1.2 lib/impala/cursor.rb