lib/jvertica.rb in jvertica-0.1.9 vs lib/jvertica.rb in jvertica-0.1.10

- old
+ new

@@ -1,62 +1,71 @@ require 'java' require 'thread' -require 'jvertica/version' require 'jdbc-vertica' require Jdbc::Vertica.driver_jar +require 'jvertica/version' +require 'jvertica/result_set' +require 'jvertica/row' +require 'jvertica/error' +require 'jvertica/constant' class Jvertica unless %r{java} === RUBY_PLATFORM warn "only for use with JRuby" end - @@default_option_value = { + DEFAULT_OPTION_VALUES = { host: 'localhost', port: 5433, database: 'vdb', password: '', user: 'dbadmin', AutoCommit: false, } - def self.connect options = {} - new options + def self.connect(options = {}) + new(options) end attr_reader :host, :port, :database - def initialize options - options = @@default_option_value.merge(options).to_sym - @host = options[:host] - @port = options[:port] - @database = options[:database] - %w(:host :port :database).map do |key| - options.delete key - end + def initialize(options) + options = options.inject({}) {|h, (k, v)| h[k.to_sym] = v; h } # symbolize_keys + options = DEFAULT_OPTION_VALUES.merge(options) + @host = options.delete(:host) + @port = options.delete(:port) + @database = options.delete(:database) prop = Properties.new options.each do |key, value| - prop.put key.to_s, value + prop.put(key.to_s, value) unless value.nil? end - @connection = begin - DriverManager.getConnection "jdbc:vertica://#{host}:#{port}/#{database}", prop - rescue => e - raise ConnectionError.new("Connection Failed.\n" + - "Error Message => #{e.message}\n" + - "see documentation => #{Constant::CONNECTION_PROPERTY_DOCUMENT_URL}\n") - end + @connection = + begin + DriverManager.getConnection("jdbc:vertica://#{host}:#{port}/#{database}", prop) + rescue => e + raise ConnectionError.new( + "Connection Failed.\n" << + "Error Message => #{e.message}\n" << + "see documentation => #{Constant::CONNECTION_PROPERTY_DOCUMENT_URL}\n" + ) + end + @closed = false @connection end def closed? @closed end def close - @connection.close && @closed = true + @connection.close + ensure + @connection = nil + @closed = true end def commit @connection.commit end @@ -75,79 +84,81 @@ #def execute *args, &blk # TODO #end - def property key, value = nil + def property(key, value = nil) key = key.to_s if value.nil? - @connection.getProperty key + @connection.getProperty(key) else - @connection.setProperty key, value + @connection.setProperty(key, value) end end - def query query, &blk + def query(query, &blk) stmt = @connection.createStatement case query - when %r{\A\s*copy}miu then return stmt.execute query - when %r{\A\s*insert}miu then return stmt.executeUpdate query - when %r{\A\s*update}miu then return stmt.executeUpdate query - when %r{\A\s*delete}miu then return stmt.executeUpdate query - when %r{\A\s*drop}miu then return stmt.execute query - when %r{\A\s*create}miu then return stmt.execute query - when %r{\A\s*set}miu then return stmt.execute query - else rs = stmt.executeQuery query + when %r{\A\s*copy}miu then return stmt.execute(query) + when %r{\A\s*insert}miu then return stmt.executeUpdate(query) + when %r{\A\s*update}miu then return stmt.executeUpdate(query) + when %r{\A\s*delete}miu then return stmt.executeUpdate(query) + when %r{\A\s*drop}miu then return stmt.execute(query) + when %r{\A\s*create}miu then return stmt.execute(query) + when %r{\A\s*set}miu then return stmt.execute(query) + else rs = stmt.executeQuery(query) end if block_given? ResultSet.new(rs).each do |row| yield row end else - ResultSet.new rs + ResultSet.new(rs) end end - def copy query, source = nil, &blk + def copy(query, source = nil, &blk) raise InvalidQuery.new('can use only "copy".') unless %r{\A\s*copy}miu === query - if !source.nil? or block_given? + if source or block_given? copy_stream(query, source, &blk) else [query(query), nil] end end private - def copy_stream query, io = nil, &blk - stream = com.vertica.jdbc.VerticaCopyStream.new @connection, query + + class DriverManager < java.sql.DriverManager + end + + class Properties < java.util.Properties + end + + class DataSource < com.vertica.jdbc.DataSource + end + + def copy_stream(query, io = nil, &blk) + unless ((io and io.is_a?(IO)) or block_given?) + raise InvalidObject.new("block or IO object is required.") + end + + stream = com.vertica.jdbc.VerticaCopyStream.new(@connection, query) stream.start thread = nil + begin if block_given? - i, o = IO.pipe - begin - thread = Thread.new do - yield(o) - o.close - end - stream.addStream org.jruby.util.IOInputStream.new(i) - rescue => e - raise e - ensure + thread = Thread.new do + yield(o) + o.close end - + stream.addStream(org.jruby.util.IOInputStream.new(i)) else - - if io.is_a? IO - stream.addStream org.jruby.util.IOInputStream.new(io) - else - raise InvalidObject.new("source must be a IO.") - end - + stream.addStream(org.jruby.util.IOInputStream.new(io)) end rescue => e r = stream.finish raise e.class.new("[affected rows: #{r}] #{e.message}") @@ -155,264 +166,12 @@ begin stream.execute rejects = stream.getRejects results = stream.finish - rescue => e - raise e ensure thread.join unless thread.nil? end [results, rejects.to_ary] - end - - class ConnectionError < StandardError - end - - class InvalidQuery < StandardError - end - - class InvalidObject < StandardError - end - - class InsufficientArgument < StandardError - end - - class ResultSet - include Enumerable - - def each - return enum_for(:each) unless block_given? - return if closed? - - while @nrow - idx = 0 - row = Jvertica::Row.new( - @col_labels, - @col_labels_d, - @getters.map{|gt| - case gt - when :getBigNum - v = @rset.getBigDecimal idx+=1 - @rset.was_null ? nil : v.toPlainString.to_i - when :getBigDecimal - v = @rset.getBigDecimal idx+=1 - @rset.was_null ? nil : BigDecimal.new(v.toPlainString) - else - v = @rset.send gt, idx+=1 - @rset.was_null ? nil : v - end - }, - @rownum += 1 - ) - close unless @nrow = @rset.next - yield row - end - close - end - - def close - return if closed? - - @rset.close - @close_callback.call if @close_callback - @closed = true - end - - def closed? - @closed - end - - def initialize rset, &close_callback - unless rset.respond_to? :get_meta_data - rset.close if rset - @closed = true - return - end - - @close_callback = close_callback - @rset = rset - @rsmd = @rset.get_meta_data - @num_col = @rsmd.get_column_count - @getters = [] - @col_labels = [] - @col_labels_d = [] - (1..@num_col).each do |i| - type = @rsmd.get_column_type i - - @getters << - case type - when java.sql.Types::NUMERIC, java.sql.Types::DECIMAL - precision = @rsmd.get_precision i - scale = @rsmd.get_scale i - - if precision > 0 and scale >= 0 - if scale > 0 then :getBigDecimal - else - - if precision <= 9 then :getInt - elsif precision <= 18 then :getLong - else :getBigNum - end - - end - else :getBigDecimal - end - - else Jvertica::Constant::GETTER_MAP.fetch type, :get_string - end - - label = @rsmd.get_column_label i - @col_labels << label - @col_labels_d << label.downcase - end - - @rownum = -1 - @nrow = @rset.next - @closed = false - end - end - - class Row - attr_reader :labels, :values, :rownum - alias_method :keys, :labels - - include Enumerable - - def [] *idx - return @values[*idx] if idx.length > 1 - - idx = idx.first - case idx - when Fixnum - raise RangeError.new("Index out of bound") if idx >= @values.length - @values[idx] - when String, Symbol - vidx = @labels_d.index(idx.to_s.downcase) or - raise NameError.new("Unknown column label: #{idx}") - @values[vidx] - else - @values[idx] - end - end - - def each(&blk) - @values.each do |v| - yield v - end - end - - def inspect - strs = [] - @labels.each do |col| - strs << "#{col}: #{self[col] || '(null)'}" - end - '[' + strs.join(', ') + ']' - end - - def to_a - @values - end - - def join sep = $OUTPUT_FIELD_SEPARATOR - to_a.join sep - end - - def eql? other - self.hash == other.hash - end - - def hash - @labels.zip(@values).sort.hash - end - - def to_h - Hash[@labels.zip @values] - end - - alias :== :eql? - - def initialize col_labels, col_labels_d, values, rownum - @labels = col_labels - @labels_d = col_labels_d - @values = values - @rownum = rownum - end - - def method_missing symb, *args - if vidx = @labels_d.index(symb.to_s.downcase) - @values[vidx] - elsif @values.respond_to? symb - @values.send symb, *args - else - raise NoMethodError.new("undefined method or attribute `#{symb}'") - end - end - - [:id, :tap, :gem, :display, :class, :method, :methods, :trust].select do |s| - method_defined? s - end.each do |m| - undef_method m - end - end - - class DriverManager < java.sql.DriverManager - end - - class Properties < java.util.Properties - end - - class DataSource < com.vertica.jdbc.DataSource - end - - module Constant - CONNECTION_PROPERTY_DOCUMENT_URL = - 'http://my.vertica.com/docs/7.1.x/HTML/index.htm#Authoring/ProgrammersGuide/ClientJDBC/JDBCConnectionProperties.htm' - - RUBY_SQL_TYPE_MAP = { - Fixnum => java.sql.Types::INTEGER, - Bignum => java.sql.Types::BIGINT, - String => java.sql.Types::VARCHAR, - Float => java.sql.Types::DOUBLE, - Time => java.sql.Types::TIMESTAMP - } - GETTER_MAP = { - java.sql.Types::TINYINT => :getInt, - java.sql.Types::SMALLINT => :getInt, - java.sql.Types::INTEGER => :getInt, - java.sql.Types::BIGINT => :getLong, - java.sql.Types::CHAR => :getString, - java.sql.Types::VARCHAR => :getString, - java.sql.Types::LONGVARCHAR => :getString, - (java.sql.Types::NCHAR rescue nil) => :getString, - (java.sql.Types::NVARCHAR rescue nil) => :getString, - (java.sql.Types::LONGNVARCHAR rescue nil) => :getString, - java.sql.Types::BINARY => :getBinaryStream, - java.sql.Types::VARBINARY => :getBinaryStream, - java.sql.Types::LONGVARBINARY => :getBinaryStream, - java.sql.Types::REAL => :getDouble, - java.sql.Types::FLOAT => :getFloat, - java.sql.Types::DOUBLE => :getDouble, - java.sql.Types::DATE => :getDate, - java.sql.Types::TIME => :getTime, - java.sql.Types::TIMESTAMP => :getTimestamp, - java.sql.Types::BLOB => :getBlob, - java.sql.Types::CLOB => :getString, - (java.sql.Types::NCLOB rescue nil) => :getString, - java.sql.Types::BOOLEAN => :getBoolean - } - end -end - -class Hash - def to_sym - self.inject self.class.new do |h, (k, v)| - h[k.to_sym] = if v.is_a? self.class - v.to_sym - else - v - end - h - end end end