=begin rdoc Create a new Cassandra client instance. Accepts a keyspace name, and optional host and port. client = Cassandra.new('twitter', '127.0.0.1', 9160) You can then make calls to the server via the client instance. client.insert(:UserRelationships, "5", {"user_timeline" => {UUID.new => "1"}}) client.get(:UserRelationships, "5", "user_timeline") For read methods, valid option parameters are: :count:: How many results to return. Defaults to 100. :start:: Column name token at which to start iterating, inclusive. Defaults to nil, which means the first column in the collation order. :finish:: Column name token at which to stop iterating, inclusive. Defaults to nil, which means no boundary. :reversed:: Swap the direction of the collation order. :consistency:: The consistency level of the request. Defaults to Cassandra::Consistency::ONE (one node must respond). Other valid options are Cassandra::Consistency::ZERO, Cassandra::Consistency::QUORUM, and Cassandra::Consistency::ALL. Note that some read options have no relevance in some contexts. For write methods, valid option parameters are: :timestamp :: The transaction timestamp. Defaults to the current time in milliseconds. This is used for conflict resolution by the server; you normally never need to change it. :consistency:: See above. =end rdoc class Cassandra include Columns include Protocol class AccessError < StandardError #:nodoc: end module Consistency include CassandraThrift::ConsistencyLevel end MAX_INT = 2**31 - 1 WRITE_DEFAULTS = { :count => MAX_INT, :timestamp => nil, :consistency => Consistency::ONE }.freeze READ_DEFAULTS = { :count => 100, :start => nil, :finish => nil, :reversed => false, :consistency => Consistency::ONE }.freeze THRIFT_DEFAULTS = { :transport => Thrift::BufferedTransport }.freeze attr_reader :keyspace, :servers, :schema, :thrift_client_options # Create a new Cassandra instance and open the connection. def initialize(keyspace, servers = "127.0.0.1:9160", thrift_client_options = {}) @is_super = {} @column_name_class = {} @sub_column_name_class = {} @thrift_client_options = THRIFT_DEFAULTS.merge(thrift_client_options) @keyspace = keyspace @servers = Array(servers) end def client @client ||= begin client = ThriftClient.new(CassandraThrift::Cassandra::Client, @servers, @thrift_client_options) unless client.get_string_list_property("keyspaces").include?(@keyspace) raise AccessError, "Keyspace #{@keyspace.inspect} not found. Available: #{keyspaces.inspect}" end client end end def keyspaces @keyspaces ||= client.get_string_list_property("keyspaces") end def inspect "# #{hash['type'].inspect}"}.join(', ') }}, @servers=#{servers.inspect}>" end ### Write # Insert a row for a key. Pass a flat hash for a regular column family, and # a nested hash for a super column family. Supports the :consistency # and :timestamp options. def insert(column_family, key, hash, options = {}) column_family, _, _, options = validate_params(column_family, key, [options], WRITE_DEFAULTS) timestamp = options[:timestamp] || Time.stamp cfmap = hash_to_cfmap(column_family, hash, timestamp) mutation = [:insert, [key, cfmap, options[:consistency]]] @batch ? @batch << mutation : _insert(*mutation[1]) end ## Delete # _mutate the element at the column_family:key:[column]:[sub_column] # path you request. Supports the :consistency and :timestamp # options. def remove(column_family, key, *columns_and_options) column_family, column, sub_column, options = validate_params(column_family, key, columns_and_options, WRITE_DEFAULTS) args = {:column_family => column_family} columns = is_super(column_family) ? {:super_column => column, :column => sub_column} : {:column => column} column_path = CassandraThrift::ColumnPath.new(args.merge(columns)) mutation = [:remove, [key, column_path, options[:timestamp] || Time.stamp, options[:consistency]]] @batch ? @batch << mutation : _remove(*mutation[1]) end # Remove all rows in the column family you request. Supports options # :consistency and :timestamp. # FIXME May not currently delete all records without multiple calls. Waiting # for ranged remove support in Cassandra. def clear_column_family!(column_family, options = {}) get_range(column_family).each { |key| remove(column_family, key, options) } end # Remove all rows in the keyspace. Supports options :consistency and # :timestamp. # FIXME May not currently delete all records without multiple calls. Waiting # for ranged remove support in Cassandra. def clear_keyspace!(options = {}) schema.keys.each { |column_family| clear_column_family!(column_family, options) } end ### Read # Count the elements at the column_family:key:[super_column] path you # request. Supports the :consistency option. def count_columns(column_family, key, *columns_and_options) column_family, super_column, _, options = validate_params(column_family, key, columns_and_options, READ_DEFAULTS) _count_columns(column_family, key, super_column, options[:consistency]) end # Multi-key version of Cassandra#count_columns. Supports options :count, # :start, :finish, :reversed, and :consistency. # FIXME Not real multi; needs server support def multi_count_columns(column_family, keys, *options) OrderedHash[*keys.map { |key| [key, count_columns(column_family, key, *options)] }._flatten_once] end # Return a list of single values for the elements at the # column_family:key:column[s]:[sub_columns] path you request. Supports the # :consistency option. def get_columns(column_family, key, *columns_and_options) column_family, columns, sub_columns, options = validate_params(column_family, key, columns_and_options, READ_DEFAULTS) _get_columns(column_family, key, columns, sub_columns, options[:consistency]) end # Multi-key version of Cassandra#get_columns. Supports the :consistency # option. # FIXME Not real multi; needs to use a Column predicate def multi_get_columns(column_family, keys, *options) OrderedHash[*keys.map { |key| [key, get_columns(column_family, key, *options)] }._flatten_once] end # Return a hash (actually, a Cassandra::OrderedHash) or a single value # representing the element at the column_family:key:[column]:[sub_column] # path you request. Supports options :count, :start, # :finish, :reversed, and :consistency. def get(column_family, key, *columns_and_options) multi_get(column_family, [key], *columns_and_options)[key] end # Multi-key version of Cassandra#get. Supports options :count, # :start, :finish, :reversed, and :consistency. def multi_get(column_family, keys, *columns_and_options) column_family, column, sub_column, options = validate_params(column_family, keys, columns_and_options, READ_DEFAULTS) hash = _multiget(column_family, keys, column, sub_column, options[:count], options[:start], options[:finish], options[:reversed], options[:consistency]) # Restore order ordered_hash = OrderedHash.new keys.each { |key| ordered_hash[key] = hash[key] || (OrderedHash.new if is_super(column_family) and !sub_column) } ordered_hash end # Return true if the column_family:key:[column]:[sub_column] path you # request exists. Supports the :consistency option. def exists?(column_family, key, *columns_and_options) column_family, column, sub_column, options = validate_params(column_family, key, columns_and_options, READ_DEFAULTS) _multiget(column_family, [key], column, sub_column, 1, nil, nil, nil, options[:consistency])[key] end # Return a list of keys in the column_family you request. Requires the # table to be partitioned with OrderPreservingHash. Supports the # :count, :start, :finish, and :consistency # options. def get_range(column_family, options = {}) column_family, _, _, options = validate_params(column_family, "", [options], READ_DEFAULTS) _get_range(column_family, options[:start].to_s, options[:finish].to_s, options[:count], options[:consistency]) end # Count all rows in the column_family you request. Requires the table # to be partitioned with OrderPreservingHash. Supports the :start, # :finish, and :consistency options. # FIXME will count only MAX_INT records def count_range(column_family, options = {}) get_range(column_family, options.merge(:count => MAX_INT)).size end # Open a batch operation and yield. Inserts and deletes will be queued until # the block closes, and then sent atomically to the server. Supports the # :consistency option, which overrides the consistency set in # the individual commands. def batch(options = {}) _, _, _, options = validate_params(schema.keys.first, "", [options], WRITE_DEFAULTS) @batch = [] yield compact_mutations! @batch.each do |mutation| case mutation.first when :insert _insert(*mutation[1]) when :remove _remove(*mutation[1]) end end ensure @batch = nil end private # Extract and validate options. # FIXME Should be done as a decorator def validate_params(column_family, keys, args, options) options = options.dup column_family = column_family.to_s # Keys [keys].flatten.each do |key| raise ArgumentError, "Key #{key.inspect} must be a String for #{calling_method}" unless key.is_a?(String) end # Options if args.last.is_a?(Hash) extras = args.last.keys - options.keys raise ArgumentError, "Invalid options #{extras.inspect[1..-2]} for #{calling_method}" if extras.any? options.merge!(args.pop) end # Ranges column, sub_column = args[0], args[1] klass, sub_klass = column_name_class(column_family), sub_column_name_class(column_family) range_class = column ? sub_klass : klass options[:start] = options[:start] ? range_class.new(options[:start]).to_s : "" options[:finish] = options[:finish] ? range_class.new(options[:finish]).to_s : "" [column_family, s_map(column, klass), s_map(sub_column, sub_klass), options] end def calling_method "#{self.class}##{caller[0].split('`').last[0..-3]}" end # Convert stuff to strings. def s_map(el, klass) case el when Array then el.map { |i| s_map(i, klass) } when NilClass then nil else klass.new(el).to_s end end # Roll up queued mutations, to improve atomicity. def compact_mutations! #TODO re-do this rollup end def schema(load=true) if !load && !@schema [] else @schema ||= client.describe_keyspace(@keyspace) end end end