# # Copyright (c) 2011 RightScale Inc # # Permission is hereby granted, free of charge, to any person obtaining # a copy of this software and associated documentation files (the # "Software"), to deal in the Software without restriction, including # without limitation the rights to use, copy, modify, merge, publish, # distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so, subject to # the following conditions: # # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # If cassandra gem is installed and 0.8 bindings are available, apply some # monkey patches. if require_succeeds?('cassandra/0.8') class Cassandra module Protocol # Monkey patch _get_indexed_slices so that it accepts list of columns when doing indexed # slice, otherwise not able to get specific columns using secondary index lookup def _get_indexed_slices(column_family, index_clause, columns, count, start, finish, reversed, consistency) column_parent = CassandraThrift::ColumnParent.new(:column_family => column_family) if columns predicate = CassandraThrift::SlicePredicate.new(:column_names => [columns].flatten) else predicate = CassandraThrift::SlicePredicate.new(:slice_range => CassandraThrift::SliceRange.new( :reversed => reversed, :count => count, :start => start, :finish => finish)) end client.get_indexed_slices(column_parent, index_clause, predicate, consistency) end end # Monkey patch get_range_single so that it can return a array of key slices # rather than converting it to a Hash def get_range_single(column_family, options = {}) return_empty_rows = options.delete(:return_empty_rows) || false slices_please = options.delete(:slices_not_hash) || false column_family, _, _, options = extract_and_validate_params(column_family, "", [options], READ_DEFAULTS.merge(:start_key => '', :finish_key => '', :key_count => 100, :columns => nil, :reversed => false ) ) results = _get_range(column_family, options[:start_key].to_s, options[:finish_key].to_s, options[:key_count], options[:columns], options[:start].to_s, options[:finish].to_s, options[:count], options[:consistency], options[:reversed]) unless slices_please multi_key_slices_to_hash(column_family, results, return_empty_rows) else results end end # Monkey patch get_indexed_slices so that it returns OrderedHash, otherwise cannot determine # next start key when getting in chunks def get_indexed_slices(column_family, index_clause, *columns_and_options) return false if Cassandra.VERSION.to_f < 0.7 column_family, columns, _, options = extract_and_validate_params(column_family, [], columns_and_options, READ_DEFAULTS.merge(:key_count => 100, :key_start => "")) if index_clause.class != CassandraThrift::IndexClause index_expressions = index_clause.collect do |expression| create_index_expression(expression[:column_name], expression[:value], expression[:comparison]) end index_clause = create_index_clause(index_expressions, options[:key_start], options[:key_count]) end key_slices = _get_indexed_slices(column_family, index_clause, columns, options[:count], options[:start], options[:finish], options[:reversed], options[:consistency]) key_slices.inject(OrderedHash.new) { |h, key_slice| h[key_slice.key] = key_slice.columns; h } end end end # If thrift gem is available and we are running JRuby, monkey patch it so it # works! if (RUBY_PLATFORM =~ /java/) && require_succeeds?('thrift') module Thrift class Socket def open begin addrinfo = ::Socket::getaddrinfo(@host, @port).first @handle = ::Socket.new(addrinfo[4], ::Socket::SOCK_STREAM, 0) sockaddr = ::Socket.sockaddr_in(addrinfo[1], addrinfo[3]) begin @handle.connect_nonblock(sockaddr) rescue Errno::EINPROGRESS resp = IO.select(nil, [@handle], nil, @timeout) # 3 lines removed here, 1 line added begin @handle.connect_nonblock(sockaddr) rescue Errno::EISCONN end end @handle rescue StandardError => e raise TransportException.new(TransportException::NOT_OPEN, "Could not connect to #{@desc}: #{e}") end end end end end module RightSupport::DB # Exception that indicates database configuration info is missing. class MissingConfiguration < Exception; end class UnsupportedRubyVersion < Exception; end # Base class for a column family in a keyspace # Used to access data persisted in Cassandra # Provides wrappers for Cassandra client methods class CassandraModel include RightSupport::Log::Mixin # For long-running operations, generate period log output after this many columns to help # observers figure out how much progress has been made LONG_OPERATION_LOG_PERIOD = 1_000_000 # Default timeout for client connection to Cassandra server DEFAULT_TIMEOUT = 20 # Default maximum number of columns to retrieve in one chunk DEFAULT_COUNT = 100 # Default maximum number of rows to retrieve in one chunk DEFAULT_ROW_COUNT = 100 # Wrappers for Cassandra client class << self attr_reader :default_keyspace attr_accessor :column_family @@current_keyspace = nil @@connections = {} METHODS_TO_LOG = [:multi_get, :get, :get_indexed_slices, :get_columns, :insert, :remove, 'multi_get', 'get', 'get_indexed_slices', 'get_columns', 'insert', 'remove'] #This is required to be overwritten in order to set the read CL def default_read_consistency nil end #This is required to be overwritten in order to set the write CL def default_write_consistency nil end # Deprecate usage of CassandraModel under Ruby < 1.9 def inherited(base) raise UnsupportedRubyVersion, "Support only Ruby >= 1.9" unless RUBY_VERSION >= "1.9" end def config @@config end def env_config env = ENV['RACK_ENV'] raise MissingConfiguration, "CassandraModel config is missing a '#{ENV['RACK_ENV']}' section" \ unless !@@config.nil? && @@config.keys.include?(env) && @@config[env] @@config[env] end def config=(value) @@config = normalize_config(value) unless value.nil? end # Compute the token for a given row key, which can provide information on the progress of very large # "each" operations, e.g. iterating over all rows of a column family. # # @param [String] key byte-vector (binary String) representation of the row key # @return [Integer] the 128-bit token for a given row key, as used by Cassandra's RandomPartitioner def calculate_random_partitioner_token(key) number = Digest::MD5.hexdigest(key).to_i(16) if number >= (2**127) # perform two's complement, basically this takes the absolute value of the number as # if it were a 128-bit signed number. Equivalent to Java BigInteger.abs() operation. result = (number ^ (2**128)-1) + 1 else # we're good result = number end result end # As part of constructing the keyspace name, determine if we have a trailing namespace to # append. Applicable to rails_env test and integration, otherwise defaults to nil. def namespace(env) ( config[env] && config[env]['namespace'] ) || begin case env when 'test' 'testns' when 'integration' File.read('/etc/namespace').strip if File.file?('/etc/namespace') end end end # Return current keyspace names as Array of String (any keyspace that has been used this session). # # === Return # (Array):: keyspaces names def keyspaces @@connections.keys end # Return current active keyspace. # # === Return # keyspace(String):: current_keyspace or default_keyspace def keyspace @@current_keyspace || @@default_keyspace end # Sets the default keyspace # # === Parameters # keyspace(String):: Set the default keyspace def keyspace=(kyspc) env = ENV['RACK_ENV'] || 'development' nspace = namespace(env) @@default_keyspace = "#{kyspc}_#{env}" @@default_keyspace += "_#{nspace}" if nspace end # Temporarily change the working keyspace for this class for the duration of # the block. Resets working keyspace back to default once control has # returned to the caller. # # === Parameters # keyspace(String):: Keyspace name # append_env(true|false):: optional; default true - whether to append the environment name # block(Proc):: Code that will be called in keyspace context def with_keyspace(keyspace, append_env=true, &block) @@current_keyspace = keyspace env = ENV['RACK_ENV'] || 'development' nspace = namespace(env) if append_env if nspace tail = "_#{env}_#{nspace}" else tail = "_#{env}" end @@current_keyspace += tail unless @@current_keyspace.end_with?(tail) end block.call ensure @@current_keyspace = nil end def get_connection(current=nil) config = env_config thrift_client_options = { :timeout => RightSupport::DB::CassandraModel::DEFAULT_TIMEOUT, :server_retry_period => nil, } thrift_client_options.merge!({:protocol => Thrift::BinaryProtocolAccelerated}) if defined? Thrift::BinaryProtocolAccelerated current ||= Cassandra.new(keyspace, config["server"], thrift_client_options) current.disable_node_auto_discovery! current.default_write_consistency = self.default_write_consistency if self.default_write_consistency current.default_read_consistency = self.default_read_consistency if self.default_read_consistency current end # Return a Cassandra client object connected to a server and authorized to a suitable keyspace. # Create connection if does not already exist; use BinaryProtocolAccelerated if available. # # This method determines the current keyspace based on the return value of self.keyspace # which looks at the value of @@current_keyspace or @@default_keyspace to determine the keyspace it is operating # under. If a connection already exists for the keyspace it will re-use it. If a connection does not exist, # it will create a new persistent connection for that keyspace that can be re-used with future requests # # @return [Cassandra] a client object that can be used to send requests to the ring def conn() @@connections ||= {} @@connections[self.keyspace] = get_connection(@@connections[self.keyspace]) @@connections[self.keyspace] end # Get row(s) for specified key(s) # Unless :count is specified, a maximum of 100 columns are retrieved # # === Parameters # k(String|Array):: Individual primary key or list of keys on which to match # opt(Hash):: Request options including :consistency and for column level # control :count, :start, :finish, :reversed # # === Return # (Object|nil):: Individual row, or nil if not found, or ordered hash of rows def all(k, opt = {}) real_get(k, opt) end # Get row for specified primary key and convert into object of given class # Unless :count is specified, a maximum of 100 columns are retrieved # # === Parameters # key(String):: Primary key on which to match # opt(Hash):: Request options including :consistency and for column level # control :count, :start, :finish, :reversed # # === Return # (CassandraModel|nil):: Instantiated object of given class, or nil if not found def get(key, opt = {}) if (attrs = real_get(key, opt)).empty? nil else new(key, attrs) end end # Get raw row(s) for specified primary key(s) # Unless :count is specified, a maximum of 100 columns are retrieved # except in the case of an individual primary key request, in which # case all columns are retrieved # # === Parameters # k(String|Array):: Individual primary key or list of keys on which to match # opt(Hash):: Request options including :consistency and for column level # control :count, :start, :finish, :reversed # # === Return # (Cassandra::OrderedHash):: Individual row or OrderedHash of rows def real_get(k, opt = {}) if k.is_a?(Array) do_op(:multi_get, column_family, k, opt) elsif opt[:count] do_op(:get, column_family, k, opt) else opt = opt.clone opt[:count] = DEFAULT_COUNT columns = Cassandra::OrderedHash.new loop do chunk = do_op(:get, column_family, k, opt) columns.merge!(chunk) if chunk.size == opt[:count] # Assume there are more chunks, use last key as start of next get opt[:start] = chunk.keys.last else # This must be the last chunk break end end columns end end # Get all rows for specified secondary key # # === Parameters # index(String):: Name of secondary index # key(String):: Index value that each selected row is required to match # columns(Array|nil):: Names of columns to be retrieved, defaults to all # opt(Hash):: Request options with only :consistency and :count used # # === Block # Optional block that is yielded each chunk as it is retrieved as an array # like the normally returned result # # === Return # (OrderedHash):: Rows retrieved with each key, value is columns def get_all_indexed_slices(index, key, columns = nil, opt = {}) rows = Cassandra::OrderedHash.new start = "" count = opt.delete(:count) || DEFAULT_COUNT expr = do_op(:create_idx_expr, index, key, "EQ") opt = opt[:consistency] ? {:consistency => opt[:consistency]} : {} while true clause = do_op(:create_idx_clause, [expr], start, count) chunk = self.conn.get_indexed_slices(column_family, clause, columns, opt) rows.merge!(chunk) if chunk.size == count # Assume there are more chunks, use last key as start of next get start = chunk.keys.last else # This must be the last chunk break end end rows end # This method is an attempt to circumvent the Cassandra gem limitation of returning only 100 columns for wide rows, # and also to help reliably iterate through a column family when the node is busy and experiencing many timeouts. # # Internally, it uses Cassandra#get_indexed_slices to find rows that match your index constraint; for each matching # row key, it iterates through all columns of that row, in chunks, using Cassandra#get_range. This approach is less # efficient than grabbing some column values in the initial #get_indexed_slices, but it allows us to preserve the # natural ordering of the columns we yield, and prevents us from yielding any column twice. # # A row key may be yielded more than once as each "chunk" of columns from that row is # read from the ring, but each column will be yielded exactly once. # # @return [Integer] total number of columns read # # @param [String] index column name # @param [String] index column value # # @yield [row_key, columns] yields one or more times for every row that contains a matching index column, ultimately yielding EVERY column in that row # @yieldparam [String] row_key the row key currently being processes # @yieldparam [Array] columns an array of Cassandra CassandraThrift::ColumnOrSuperColumn objects def stream_all_indexed_slices(index, key) expr = do_op(:create_idx_expr, index, key, "EQ") start_row = '' last_report = 0 total_rows = 0 total_columns = 0 # Loop over all CF rows, with batches of X while (start_row != nil) # Reset these to their initial values on every iteration thru the loop, in case # we backed off due to timeouts (see rescue clauses below) # # Since this method is made for wide rows, also ask for 10x the default column count. row_count = DEFAULT_ROW_COUNT column_count = DEFAULT_COUNT * 10 clause = do_op(:create_idx_clause, [expr], start_row, row_count) # Ask for a single column from each row, because we don't care about the column values # in this step; we just want the row keys that contain a matching index column. begin row_keys = self.conn.get_indexed_slices(column_family, clause, :count => 1).keys rescue Exception => e if retryable_read_timeout?(e) logger.error "CassandraModel#stream_all_indexed_slices retrying get_indexed_slices with fewer rows due to a %s: %s @ %s (cf='%s' start_row='%s' row_count=%d)" % [e.class.name, e.message, e.backtrace.first, column_family, start_row, row_count] row_count /= 10 if row_count > 1 retry else raise end end row_keys.each do |row_key| # We already processed this row the previous iteration; skip it next if row_key == start_row total_rows += 1 start_column = '' while start_column != nil begin options = { :start_key => row_key, :finish_key => row_key, :start => start_column, :count => column_count, :slices_not_hash => true } columns = self.conn.get_range(column_family, options).first.columns if columns[0].column.name == start_column total_columns += columns.size - 1 yield(row_key, columns[1..-1]) else total_columns += columns.size yield(row_key, columns) end # Help Ops figure out where we are if (total_columns - last_report > LONG_OPERATION_LOG_PERIOD) logger.info "CassandraModel#stream_all_indexed_slices got RangeSlice total_rows=%d total_columns=%d (cf='%s' row='%s' start='%s' count=%d)" % [total_rows, total_columns, column_family, row_key, start_column, column_count] last_report = total_columns end if columns.size >= column_count start_column = columns.last.column.name else start_column = nil end rescue Exception => e if retryable_read_timeout?(e) logger.error "CassandraModel#stream_all_indexed_slices retrying get_range with fewer cols due to a %s: %s @ %s (cf='%s' row='%s' start='%s' count=%d)" % [e.class.name, e.message, e.backtrace.first, column_family, row_key, start_column, column_count] column_count /= 10 if column_count > 1 retry else raise end end end end if row_keys.size >= row_count start_row = row_keys.last else start_row = nil end end total_columns end # Get all rows for specified secondary key # # === Parameters # index(String):: Name of secondary index # key(String):: Index value that each selected row is required to match # columns(Array|nil):: Names of columns to be retrieved, defaults to all # opt(Hash):: Request options with only :consistency used # # === Return # (Array):: Rows retrieved with each member being an instantiated object of the # given class as value, but object only contains values for the columns retrieved; # array is always empty if a block is given def get_indexed(index, key, columns = nil, opt = {}) rows = [] start = "" count = DEFAULT_COUNT expr = do_op(:create_idx_expr, index, key, "EQ") opt = opt[:consistency] ? {:consistency => opt[:consistency]} : {} loop do clause = do_op(:create_idx_clause, [expr], start, count) chunk = do_op(:get_indexed_slices, column_family, clause, columns, opt) chunk_rows = [] chunk.each do |row_key, row_columns| if row_columns && row_key != start attrs = row_columns.inject({}) { |a, c| a[c.column.name] = c.column.value; a } chunk_rows << new(row_key, attrs) end end if block_given? yield chunk_rows else rows.concat(chunk_rows) end if chunk.size == count # Assume there are more chunks, use last key as start of next get start = chunk.keys.last else # This must be the last chunk break end end rows end # Get specific columns in row with specified key # # === Parameters # key(String):: Primary key on which to match # columns(Array):: Names of columns to be retrieved # opt(Hash):: Request options such as :consistency # # === Return # (Array):: Values of selected columns in the order specified def get_columns(key, columns, opt = {}) do_op(:get_columns, column_family, key, columns, sub_columns = nil, opt) end # Insert a row for a key # # === Parameters # key(String):: Primary key for value # values(Hash):: Values to be stored # opt(Hash):: Request options such as :consistency # # === Return # (Array):: Mutation map and consistency level def insert(key, values, opt={}) do_op(:insert, column_family, key, values, opt) end # Delete row or columns of row # # === Parameters # args(Array):: Key, columns, options # # === Return # (Array):: Mutation map and consistency level def remove(*args) do_op(:remove, column_family, *args) end # Open a batch operation and yield self # Inserts and deletes are queued until the block closes, # and then sent atomically to the server # Supports :consistency option, which overrides that set # in individual commands # # === Parameters # args(Array):: Batch options such as :consistency # # === Block # Required block making Cassandra requests # # === Returns # (Array):: Mutation map and consistency level # # === Raise # Exception:: If block not specified def batch(*args, &block) raise "Block required!" unless block_given? do_op(:batch, *args, &block) end # Perform a Cassandra operation on the connection object. # Rescue IOError by automatically reconnecting and retrying the operation. # # === Parameters # meth(Symbol):: Method to be executed # *args(Array):: Method arguments to forward to the Cassandra connection # # === Block # Block if any to be executed by method # # === Return # (Object):: Value returned by executed method def do_op(meth, *args, &block) first_started_at ||= Time.now retries ||= 0 started_at = Time.now # cassandra functionality result = conn.send(meth, *args, &block) # log functionality do_op_log(first_started_at, started_at, retries, meth, args[0], args[1]) return result rescue IOError reconnect retries += 1 retry end def do_op_log(first_started_at, started_at, retries, meth, cf, key) now = Time.now attempt_time = now - started_at if METHODS_TO_LOG.include?(meth) key_count = key.is_a?(Array) ? key.size : 1 log_string = sprintf("CassandraModel %s, cf=%s, keys=%d, time=%.1fms", meth, cf, key_count, attempt_time*1000) if retries && retries > 0 total_time = now - first_started_at log_string += sprintf(", retries=%d, total_time=%.1fms", retries, total_time*1000) end logger.debug(log_string) end end # Reconnect to Cassandra server # Use BinaryProtocolAccelerated if it available # # === Return # true:: Always return true def reconnect return false if keyspace.nil? @@connections[keyspace] = get_connection true end # Cassandra ring # # === Return # (Array):: Members of ring def ring conn.ring end private # Massage configuration hash into a standard form. # @return the config hash, with contents normalized def normalize_config(untrasted_config) untrasted_config.each do |env, config| raise MissingConfiguration, "CassandraModel config is broken, a '#{ENV['RACK_ENV']}' missing 'server' option" \ unless config.keys.include?('server') server = config['server'] if server.is_a?(String) # Strip surrounding brackets, in case Ops put a YAML array into an input value if server.start_with?('[') && server.end_with?(']') server = server[1..-2] end # Transform comma-separated host lists into an Array if server =~ /,/ server = server.split(/\s*,\s*/) end end config['server'] = server config end end # Determine whether an exception can safely be retried, if the operation being performed # was a read (i.e. is idempotent). # @param [Exception] e the exception in question # @return [Boolean] true if yes, false otherwise def retryable_read_timeout?(e) # Thrift RPC timeout that has been "improved" by cassandra gem's moronic exception-wrapping wrapped_timeout = e.is_a?(CassandraThrift::TimedOutException) # I have no idea what these are, but they show up when we have a local socket-read timeout after 10s # Probably due to this line from the thrift gem (in lib/thrift/transport/socket.rb) # raise TransportException.new(TransportException::TIMED_OUT, "Socket: Timed out reading #{sz} bytes from #{@desc}") # The TransportException gets wrapped into a CassandraThrift::Cassandra::Client::TransportException and in the process # the type is lost and the message is promoted to "type" bogus_timeout = e.is_a?(Thrift::TransportException) && e.type.is_a?(String) && e.type =~ /Timed out/i # Thrift RPC timeout unwrapped_timeout = e.is_a?(Thrift::TransportException) && (e.type == Thrift::TransportException::TIMED_OUT) # Thrift socket disconnect unwrapped_disconnect = e.is_a?(Thrift::TransportException) && (e.type == Thrift::TransportException::NOT_OPEN) wrapped_timeout || bogus_timeout || unwrapped_timeout || unwrapped_disconnect end end # self attr_accessor :key, :attributes # Create column family object # # === Parameters # key(String):: Primary key for object # attrs(Hash):: Attributes for object which form Cassandra row # with column name as key and column value as value def initialize(key, attrs = {}) self.key = key self.attributes = attrs end # Store object in Cassandra # # === Return # true:: Always return true def save self.class.insert(key, attributes) true end # Load object from Cassandra without modifying this object # # === Return # (CassandraModel):: Object as stored in Cassandra def reload self.class.get(key) end # Reload object value from Cassandra and update this object # # === Return # (CassandraModel):: This object after reload from Cassandra def reload! self.attributes = self.class.real_get(key) self end # Column value # # === Parameters # key(String|Integer):: Column name or key # # === Return # (Object|nil):: Column value, or nil if not found def [](key) ret = attributes[key] return ret if ret if key.kind_of? Integer return attributes[Cassandra::Long.new(key)] end end # Store new column value # # === Parameters # key(String|Integer):: Column name or key # value(Object):: Value to be stored # # === Return # (Object):: Value stored def []=(key, value) attributes[key] = value end # Delete object from Cassandra # # === Return # true:: Always return true def destroy self.class.remove(key) end end # CassandraModel end # RightSupport::DB