require 'bigdecimal' require 'thread' class HBase # @!attribute [r] name # @return [String] The name of the table # @!attribute [r] config # @return [org.apache.hadoop.conf.Configuration] class Table attr_reader :name attr_reader :config include Enumerable include Admin include Scoped::Aggregation::Admin include HBase::Util # (INTERNAL) Returns the underlying org.apache.hadoop.hbase.client.HTable object (local to current thread) # @return [org.apache.hadoop.hbase.client.HTable] def htable check_closed local_htables = Thread.current[:htable] ||= {} local_htables[@name] ||= @pool.get_table(@name) end # @deprecated # @return [nil] def close nil end [:get, :count, :aggregate, :range, :project, :filter, :while, :limit, :versions, :caching, :batch, :time_range, :at ].each do |method| define_method(method) do |*args| self.each.send(method, *args) end end def with_java_scan &block self.each.with_java_scan(&block) end def with_java_get &block self.each.with_java_get(&block) end # Performs PUT operations # @overload put(rowkey, data) # Put operation on a rowkey # @param [Object] rowkey Rowkey # @param [Hash] data Data to put # @return [Fixnum] Number of puts succeeded # @overload put(data) # Put operation on multiple rowkeys # @param [Hash] data Data to put indexed by rowkeys # @return [Fixnum] Number of puts succeeded def put *args return put(args.first => args.last) if args.length == 2 puts = args.first.map { |rowkey, props| putify rowkey, props } htable.put puts puts.length end # Deletes data # @overload delete(rowkey) # Deletes a row with the given rowkey # @param [Object] rowkey # @return [nil] # @example # table.delete('a000') # @overload delete(rowkey, column_family) # Deletes columns with the given column family from the row # @param [Object] rowkey # @param [String] column_family # @return [nil] # @example # table.delete('a000', 'cf1') # @overload delete(rowkey, column) # Deletes a column # @param [Object] rowkey # @param [String, Array] column Column expression in String "FAMILY:QUALIFIER", or in Array [FAMILY, QUALIFIER] # @return [nil] # @example # table.delete('a000', 'cf1:col1') # @overload delete(rowkey, column, *timestamps) # Deletes specified versions of a column # @param [Object] rowkey # @param [String, Array] column Column expression in String "FAMILY:QUALIFIER", or in Array [FAMILY, QUALIFIER] # @param [*Fixnum] timestamps Timestamps. # @return [nil] # @example # table.delete('a000', 'cf1:col1', 1352978648642) # @overload delete(*delete_specs) # Batch deletion # @param [Array] delete_specs # @return [nil] # @example # table.delete( # ['a000', 'cf1:col1', 1352978648642], # ['a001', 'cf1:col1'], # ['a002', 'cf1'], # ['a003']) def delete *args specs = args.first.is_a?(Array) ? args : [args] htable.delete specs.map { |spec| rowkey, cfcq, *ts = spec cf, cq = Util.parse_column_name(cfcq) if cfcq Delete.new(Util.to_bytes rowkey).tap { |del| if !ts.empty? ts.each do |t| del.deleteColumn cf, cq, time_to_long(t) end elsif cq # Delete all versions del.deleteColumns cf, cq elsif cf del.deleteFamily cf end } } end # Delete rows. # @param [*Object] rowkeys List of rowkeys of rows to delete # @return [nil] def delete_row *rowkeys htable.delete rowkeys.map { |rk| Delete.new(Util.to_bytes rk) } end # Atomically increase numeric values # @overload increment(rowkey, column, by) # Atomically increase column value by the specified amount # @param [Object] rowkey Rowkey # @param [String, Array] column Column expression in String "FAMILY:QUALIFIER", or in Array [FAMILY, QUALIFIER] # @param [Fixnum] by Increment amount # @return [Fixnum] Column value after increment # @example # table.increment('a000', 'cf1:col1', 1) # @overload increment(rowkey, column_by_hash) # Atomically increase values of multiple columns # @param [Object] rowkey Rowkey # @param [Hash] column_by_hash Column expression to increment amount pairs # @example # table.increment('a000', 'cf1:col1' => 1, 'cf1:col2' => 2) def increment rowkey, *args if args.first.is_a?(Hash) cols = args.first htable.increment Increment.new(Util.to_bytes rowkey).tap { |inc| cols.each do |col, by| cf, cq = Util.parse_column_name(col) inc.addColumn cf, cq, by end } else col, by = args cf, cq = Util.parse_column_name(col) htable.incrementColumnValue Util.to_bytes(rowkey), cf, cq, by || 1 end end # Scan through the table # @yield [row] Yields each row in the scope # @yieldparam [HBase::Result] row # @return [HBase::Scoped] def each check_closed if block_given? Scoped.send(:new, self).each { |r| yield r } else Scoped.send(:new, self) end end private def initialize hbase, config, htable_pool, name @hbase = hbase @config = config @pool = htable_pool @name = name.to_s end def check_closed raise RuntimeError, "HBase connection is already closed" if @hbase.closed? end def putify rowkey, props Put.new(Util.to_bytes rowkey).tap { |put| props.each do |col, val| cf, cq = Util.parse_column_name(col) case val when Hash val.each do |t, v| case t # Timestamp / Ruby Time when Time, Fixnum put.add cf, cq, time_to_long(t), Util.to_bytes(v) # Types: :byte, :short, :int, ... else put.add cf, cq, Util.to_bytes(t => v) end end else put.add cf, cq, Util.to_bytes(val) end end } end end#Table end#HBase