# frozen_string_literal: true require "cassandra" # This class provides a lightweight wrapper around the Cassandra driver. It provides # a foundation for maintaining a connection and constructing CQL statements. class Cassie require File.expand_path("../cassie/config.rb", __FILE__) require File.expand_path("../cassie/subscribers.rb", __FILE__) require File.expand_path("../cassie/model.rb", __FILE__) require File.expand_path("../cassie/schema.rb", __FILE__) require File.expand_path("../cassie/testing.rb", __FILE__) require File.expand_path("../cassie/railtie.rb", __FILE__) if defined?(Rails) class RecordNotFound < StandardError end class RecordInvalid < StandardError attr_reader :record def initialize(record) super("Errors on #{record.class.name}: #{record.errors.to_hash.inspect}") @record = record end end # Message passed to subscribers with the statement, options, and time for each statement # to execute. Note that if statements are batched they will be packed into one message # with a Cassandra::Statements::Batch statement and empty options. class Message attr_reader :statement, :options, :elapsed_time def initialize(statement, options, elapsed_time) @statement = statement @options = options @elapsed_time = elapsed_time end end attr_reader :config, :subscribers attr_accessor :consistency class << self # A singleton instance that can be shared to communicate with a Cassandra cluster. def instance unless defined?(@instance) && @instance instance = new(@config) @instance = instance end @instance end # Call this method to load the Cassie::Config from the specified file for the # specified environment. def configure!(options) if defined?(@instance) && @instance old_instance = @instance @instance = nil old_instance.disconnect end @config = Cassie::Config.new(options) end # This method can be used to set a consistency level for all Cassandra queries # within a block that don't explicitly define them. It can be used where consistency # is important (i.e. on validation queries) but where a higher level method # doesn't provide an option to set it. def consistency(level) save_val = Thread.current[:cassie_consistency] begin Thread.current[:cassie_consistency] = level yield ensure Thread.current[:cassie_consistency] = save_val end end # Get a Logger compatible object if it has been set. def logger @logger if defined?(@logger) end # Set a logger with a Logger compatible object. attr_writer :logger end def initialize(config) @config = config @monitor = Monitor.new @session = nil @prepared_statements = {} @last_prepare_warning = Time.now @subscribers = Subscribers.new @consistency = ((config.cluster || {})[:consistency] || :local_one) end # Open a connection to the Cassandra cluster. def connect start_time = Time.now cluster_config = config.cluster cluster_config = cluster_config.merge(logger: logger) if logger cluster = Cassandra.cluster(cluster_config) logger&.info("Cassie.connect with #{config.sanitized_cluster} in #{((Time.now - start_time) * 1000).round}ms") @monitor.synchronize do unless @session @session = cluster.connect(config.default_keyspace) @prepared_statements = {} end end end # Close the connections to the Cassandra cluster. def disconnect logger&.info("Cassie.disconnect from #{config.sanitized_cluster}") old_session = nil @monitor.synchronize do old_session = @session @session = nil @prepared_statements = {} end old_session&.close end # Return true if the connection to the Cassandra cluster has been established. def connected? !!@session end # Force reconnection. If you're using this code in conjunction in a forking server environment # like passenger or unicorn you should call this method after forking. def reconnect existing_session = @session @monitor.synchronize do if @session == existing_session disconnect connect end end end # Prepare a CQL statement for repeate execution. Prepared statements # are cached on the driver until the connection is closed. Calling # prepare multiple times with the same CQL string will return # the prepared statement from a cache. def prepare(cql) raise ArgumentError.new("CQL must be a string") unless cql.is_a?(String) statement = @prepared_statements[cql] cache_filled_up = false unless statement @monitor.synchronize do statement = session.prepare(cql) @prepared_statements[cql] = statement if @prepared_statements.size > config.max_prepared_statements # Cache is full. Clear out the oldest values. Ideally we'd remove the least recently used, # but that would require additional overhead on each query. This method will eventually # keep the most active queries in the cache and is overall more efficient. @prepared_statements.delete(@prepared_statements.first[0]) cache_filled_up = true end end end if cache_filled_up && logger && Time.now > @last_prepare_warning + 10 # Set a throttle on how often this message is logged so we don't kill performance enven more. @last_prepare_warning = Time.now logger.warn("Cassie.prepare cache filled up. Consider increasing the size from #{config.max_prepared_statements}.") end statement end # Declare and execute a batch statement. Any insert, update, or delete # calls made within the block will add themselves to the batch which # is executed at the end of the block. def batch(options = nil) if Thread.current[:cassie_batch] yield else begin batch = [] Thread.current[:cassie_batch] = batch yield unless batch.empty? batch_statement = session.logged_batch batch.each do |cql, values| if values.blank? batch_statement.add(cql) else statement = prepare(cql) statement = statement.bind(Array(values)) if values.present? batch_statement.add(statement) end end execute(batch_statement, nil, options) end ensure Thread.current[:cassie_batch] = nil end end end # Find rows using the CQL statement. If the statement is a string # and values are provided then the statement will executed as a prepared # statement. In general all statements should be executed this way. # # If you have a statement without arguments, then you should call # prepare before and pass the prepared statement if you plan on # executing the same query multiple times. def find(cql, values = nil, options = nil) execute(cql, values, options) end # Insert a row from a hash into a table. # # You can specify a ttl for the created row by supplying a :ttl option. # # If this method is called inside a batch block it will be executed in the batch. def insert(table, values_hash, options = nil) columns = [] values = [] values_hash.each do |column, value| unless value.nil? columns << column values << value end end cql = "INSERT INTO #{table} (#{columns.join(", ")}) VALUES (#{question_marks(columns.size)})" if options&.include?(:ttl) options = options.dup ttl = options.delete(:ttl) if ttl cql += " USING TTL ?" values << Integer(ttl) end end batch_or_execute(cql, values, options) end # Update a row in a table. The values to update should be passed in the # values_hash while the primary key should be passed in the key_hash. # # You can specify a ttl for the created row by supplying a :ttl option. # # If this method is called inside a batch block it will be executed in the batch. def update(table, values_hash, key_hash, options = nil) key_cql, key_values = key_clause(key_hash) update_cql = [] update_values = [] if values_hash.is_a?(String) update_cql << values_hash else values_hash.each do |column, value| update_cql << "#{column} = ?" update_values << value end end values = update_values + key_values cql = "UPDATE #{table}" if options&.include?(:ttl) options = options.dup ttl = options.delete(:ttl) if ttl cql += " USING TTL ?" values.unshift(Integer(ttl)) end end cql += " SET #{update_cql.join(", ")} WHERE #{key_cql}" batch_or_execute(cql, values, options) end # Delete a row from a table. You should pass the primary key value # in the key_hash. # # If this method is called inside a batch block it will be executed in the batch. def delete(table, key_hash, options = nil) key_cql, key_values = key_clause(key_hash) cql = "DELETE FROM #{table} WHERE #{key_cql}" batch_or_execute(cql, key_values, options) end # Execute an arbitrary CQL statment. If values are passed and the statement is a # string, it will be prepared and executed as a prepared statement. def execute(cql, values = nil, options = nil) start_time = Time.now begin statement = nil statement = if cql.is_a?(String) if values.present? prepare(cql) else Cassandra::Statements::Simple.new(cql) end else cql end if values.present? values = Array(values) options = (options ? options.merge(arguments: values) : {arguments: values}) end # Set a default consistency from a block context if it isn't explicitly set. statement_consistency = current_consistency if statement_consistency if options options = options.merge(consistency: statement_consistency) if options[:consistency].nil? else options = {consistency: statement_consistency} end end session.execute(statement, options || {}) rescue Cassandra::Errors::NoHostsAvailable => e reconnect raise e ensure if statement.is_a?(Cassandra::Statement) && !subscribers.empty? payload = Message.new(statement, options, Time.now - start_time) subscribers.each { |subscriber| subscriber.call(payload) } end end end # Return the current consistency level that has been set for statements. def current_consistency Thread.current[:cassie_consistency] || consistency end private def logger self.class.logger end def session unless connected? # Check again inside the monitor lock so we don't get in a race condition # where another thread has already established the connection. @monitor.synchronize do connect unless connected? end end @session end def batch_or_execute(cql, values, options = nil) batch = Thread.current[:cassie_batch] if batch batch << [cql, values] nil else execute(cql, values, options) end end def question_marks(size) "?#{",?" * (size - 1)}" end def key_clause(key_hash) cql = [] values = [] key_hash.each do |key, value| cql << "#{key} = ?" values << value end [cql.join(" AND "), values] end # Extract the CQL from a statement def statement_cql(statement, previous = nil) cql = nil if statement.respond_to?(:cql) cql = statement.cql elsif statement.respond_to?(:statements) && (previous.nil? || !previous.include?(statement)) previous ||= [] previous << statement cql = statement.statements.collect { |s| statement_cql(s, previous) }.join("; ") end cql end end