module ActiveRecord::Turntable module Base extend ActiveSupport::Concern included do class_attribute :turntable_connections, :turntable_clusters, :turntable_enabled, :turntable_sequencer_enabled self.turntable_connections = {} self.turntable_clusters = Hash.new {|h,k| h[k]={}} self.turntable_enabled = false self.turntable_sequencer_enabled = false class << self delegate :shards_transaction, :with_all, :to => :connection end end module ClassMethods # @param [Symbol] cluster_name cluster name for this class # @param [Symbol] shard_key_name shard key attribute name # @param [Hash] options def turntable(cluster_name, shard_key_name, options = {}) class_attribute :turntable_shard_key, :turntable_cluster, :turntable_cluster_name self.turntable_enabled = true self.turntable_cluster_name = cluster_name self.turntable_shard_key = shard_key_name self.turntable_cluster = Cluster.new( self, turntable_config[:clusters][cluster_name], options ) self.turntable_clusters[cluster_name][self] = turntable_cluster turntable_replace_connection_pool turntable_define_cluster_methods(cluster_name) end # def force_transaction_all_shards!(options={}, &block) force_connect_all_shards! shards = turntable_connections.values shards += [ActiveRecord::Base.connection_pool] recursive_transaction(shards, options, &block) end def recursive_transaction(pools, options, &block) pool = pools.shift if pools.present? pool.connection.transaction(options) do recursive_transaction(pools, options, &block) end else pool.connection.transaction(options, &block) end end def force_connect_all_shards! conf = configurations[Rails.env] shards = {} shards = shards.merge(conf["shards"]) if conf["shards"] shards = shards.merge(conf["seq"]) if conf["seq"] shards.each do |name, config| turntable_connections[name] ||= ActiveRecord::ConnectionAdapters::ConnectionPool.new(spec_for(config)) end end def turntable_replace_connection_pool ch = connection_handler cp = turntable_cluster.connection_proxy pp = PoolProxy.new(cp) ch.class_to_pool.clear if defined?(ch.class_to_pool) ch.send(:class_to_pool)[name] = ch.send(:owner_to_pool)[name] = pp end def spec_for(config) begin require "active_record/connection_adapters/#{config['adapter']}_adapter" rescue LoadError => e raise "Please install the #{config['adapter']} adapter: `gem install activerecord-#{config['adapter']}-adapter` (#{e})" end adapter_method = "#{config['adapter']}_connection" ActiveRecord::ConnectionAdapters::ConnectionSpecification.new(config, adapter_method) end def clear_all_connections! turntable_connections.values.each do |pool| pool.disconnect! end end def sequencer(sequence_name, *args) class_attribute :turntable_sequencer self.turntable_sequencer_enabled = true self.turntable_sequencer = ActiveRecord::Turntable::Sequencer.build(self, sequence_name, *args) end def turntable_enabled? turntable_enabled end def sequencer_enabled? turntable_sequencer_enabled end def current_sequence connection.current_sequence_value(self.sequence_name) if sequencer_enabled? end def current_last_shard turntable_cluster.select_shard(current_sequence) if sequencer_enabled? end def weighted_random_shard_with(*klasses, &block) shards_weight = self.turntable_cluster.weighted_shards sum = shards_weight.values.inject(&:+) idx = rand(sum) shard, weight = shards_weight.find {|k,v| (idx -= v) < 0 } self.connection.with_recursive_shards(shard.name, *klasses, &block) end def with_shard(any_shard) shard = case any_shard when Numeric turntable_cluster.shard_for(any_shard) when ActiveRecord::Base turntable_cluster.shard_for(any_shard.send(any_shard.turntable_shard_key)) else shard_or_key end connection.with_shard(shard) { yield } end private def turntable_define_cluster_methods(cluster_name) turntable_define_cluster_class_methods(cluster_name) end def turntable_define_cluster_class_methods(cluster_name) (class << ActiveRecord::Base; self; end).class_eval <<-EOD unless respond_to?(:#{cluster_name}_transaction) def #{cluster_name}_transaction(shards = [], options = {}) cluster = turntable_clusters[#{cluster_name.inspect}].values.first cluster.shards_transaction(shards, options) { yield } end end unless respond_to?(:all_cluster_transaction) def all_cluster_transaction(options = {}) clusters = turntable_clusters.values.map { |v| v.values.first } recursive_cluster_transaction(clusters) { yield } end def recursive_cluster_transaction(clusters, options = {}, &block) current_cluster = clusters.shift current_cluster.shards_transaction do if clusters.present? recursive_cluster_transaction(clusters, options, &block) else yield end end end end EOD end end def shards_transaction(options = {}, &block) self.class.shards_transaction(options, &block) end # @return [ActiveRecord::Turntable::Shard] current shard for self def turntable_shard turntable_cluster.shard_for(self.send(turntable_shard_key)) end # @see ActiveRecord::Turntable::ConnectionProxy#with_shard def with_shard(shard) self.class.connection.with_shard(shard) { yield } end end end