module CassandraObject
  module Persistence
    extend ActiveSupport::Concern
    included do
      class_inheritable_writer :write_consistency
      class_inheritable_writer :read_consistency
    end

    VALID_READ_CONSISTENCY_LEVELS = [:one, :quorum, :all]
    VALID_WRITE_CONSISTENCY_LEVELS = VALID_READ_CONSISTENCY_LEVELS

    module ClassMethods
      def consistency_levels(levels)
        if levels.has_key?(:write)
          unless valid_write_consistency_level?(levels[:write])
            raise ArgumentError, "Invalid write consistency level. Valid levels are: #{VALID_WRITE_CONSISTENCY_LEVELS.inspect}. You gave me #{levels[:write].inspect}"
          end
          self.write_consistency = levels[:write]
        end

        if levels.has_key?(:read)
          unless valid_read_consistency_level?(levels[:read])
            raise ArgumentError, "Invalid read consistency level. Valid levels are #{VALID_READ_CONSISTENCY_LEVELS.inspect}. You gave me #{levels[:write].inspect}"
          end
          self.read_consistency = levels[:read]
        end
      end

      def write_consistency
        read_inheritable_attribute(:write_consistency) || :quorum
      end

      def read_consistency
        read_inheritable_attribute(:read_consistency) || :quorum
      end

      def get(key, options = {})
        multi_get([key], options).values.first
      end

      def multi_get(keys, options = {})
        options = {:consistency => self.read_consistency, :limit => 100}.merge(options)
        unless valid_read_consistency_level?(options[:consistency])
          raise ArgumentError, "Invalid read consistency level: '#{options[:consistency]}'. Valid options are [:quorum, :one]"
        end

        attribute_results = ActiveSupport::Notifications.instrument("multi_get.cassandra_object", :keys => keys) do
          connection.multi_get(column_family, keys.map(&:to_s), :count=>options[:limit], :consistency=>consistency_for_thrift(options[:consistency]))
        end

        attribute_results.inject(ActiveSupport::OrderedHash.new) do |memo, (key, attributes)|
          if attributes.empty?
            memo[key] = nil
          else
            memo[parse_key(key)] = instantiate(key, attributes)
          end
          memo
        end
      end

      def remove(key)
        ActiveSupport::Notifications.instrument("remove.cassandra_object", :key => key) do
          connection.remove(column_family, key.to_s, :consistency => write_consistency_for_thrift)
        end
      end

      def delete_all
        connection.truncate!(column_family)
      end

      def all(keyrange = ''..'', options = {})
        count = options[:limit] || 100
        results = ActiveSupport::Notifications.instrument("get_range.cassandra_object", :start => keyrange.first, :finish => keyrange.last, :count => count) do
          connection.get_range(column_family, :start => keyrange.first, :finish => keyrange.last, :count => count)
        end

        results.map do |result|
          if result.columns.empty?
            nil
          else
            attributes = result.columns.inject(ActiveSupport::OrderedHash.new) do |memo, column|
              memo[column.column.name] = column.column.value
              memo
            end
            instantiate(result.key, attributes)
          end
        end.compact
      end

      def first(keyrange = ''..'', options = {})
        all(keyrange, options.merge(:limit=>1)).first
      end

      def create(attributes)
        new(attributes).tap do |object|
          object.save
        end
      end

      def write(key, attributes, schema_version)
        key.tap do |key|
          attributes = encode_columns_hash(attributes, schema_version)
          ActiveSupport::Notifications.instrument("insert.cassandra_object", :key => key, :attributes => attributes) do
            connection.insert(column_family, key.to_s, attributes, :consistency => write_consistency_for_thrift)
          end
        end
      end

      def instantiate(key, attributes)
        # remove any attributes we don't know about. we would do this earlier, but we want to make such
        #  attributes available to migrations
        attributes.delete_if{|k,_| !model_attributes.keys.include?(k)}
        allocate.tap do |object|
          object.instance_variable_set("@schema_version", attributes.delete('schema_version'))
          object.instance_variable_set("@key", parse_key(key))
          object.instance_variable_set("@attributes", decode_columns_hash(attributes).with_indifferent_access)
        end
      end

      def encode_columns_hash(attributes, schema_version)
        attributes.inject(Hash.new) do |memo, (column_name, value)|
          # cassandra stores bytes, not strings, so it has no concept of encodings. The ruby thrift gem 
          # expects all strings to be encoded as ascii-8bit.
          memo[column_name.to_s] = model_attributes[column_name].converter.encode(value).force_encoding('ASCII-8BIT')
          memo
        end.merge({"schema_version" => schema_version.to_s})
      end

      def decode_columns_hash(attributes)
        attributes.inject(Hash.new) do |memo, (column_name, value)|
          memo[column_name.to_s] = model_attributes[column_name].converter.decode(value)
          memo
        end
      end
      
      def column_family_configuration
        [{:Name=>column_family, :CompareWith=>"UTF8Type"}]
      end

      protected
      def valid_read_consistency_level?(level)
        !!VALID_READ_CONSISTENCY_LEVELS.include?(level)
      end

      def valid_write_consistency_level?(level)
        !!VALID_WRITE_CONSISTENCY_LEVELS.include?(level)
      end

      def write_consistency_for_thrift
        consistency_for_thrift(write_consistency)
      end

      def read_consistency_for_thrift
        consistency_for_thrift(read_consistency)
      end

      def consistency_for_thrift(consistency)
        {
          :one    => Cassandra::Consistency::ONE, 
          :quorum => Cassandra::Consistency::QUORUM,
          :all    => Cassandra::Consistency::ALL
        }[consistency]
      end
    end

    module InstanceMethods
      def save
        _run_save_callbacks do
          create_or_update
        end
      end
      
      def create_or_update
        result = persisted? ? update : create
        result != false
      end
      
      def create
        _run_create_callbacks do
          @key ||= self.class.next_key(self)
          _write
          @persisted = true
          @key
        end
      end
      
      def update
        _run_update_callbacks do
          _write
        end
      end
      
      def _write
        changed_attributes = changed.inject({}) { |h, n| h[n] = read_attribute(n); h }
        self.class.write(key, changed_attributes, schema_version)
      end

      def new_record?
        !@persisted
      end

      def destroyed?
        @destroyed
      end

      def persisted?
        @persisted && !destroyed?
      end

      def destroy
        _run_destroy_callbacks do 
          self.class.remove(key)
          @destroyed = true
          freeze
        end
      end
      
      def reload
        self.class.get(self.key)
      end
      
    end
  end
end