lib/octopus/proxy.rb in ar-octopus-0.4.0 vs lib/octopus/proxy.rb in ar-octopus-0.5.0
- old
+ new
@@ -1,48 +1,55 @@
require "set"
class Octopus::Proxy
- attr_accessor :current_model, :current_shard, :current_group, :block, :using_enabled, :last_current_shard, :config
+ attr_accessor :config
- def initialize(config)
+ def initialize(config = Octopus.config)
initialize_shards(config)
initialize_replication(config) if !config.nil? && config["replicated"]
end
def initialize_shards(config)
@shards = HashWithIndifferentAccess.new
- @groups = HashWithIndifferentAccess.new
+ @groups = {}
@adapters = Set.new
- @shards[:master] = ActiveRecord::Base.connection_pool()
- @config = ActiveRecord::Base.connection_pool.connection.instance_variable_get(:@config)
- @current_shard = :master
-
+ @shards[:master] = ActiveRecord::Base.connection_pool_without_octopus()
+ @config = ActiveRecord::Base.connection_pool_without_octopus.connection.instance_variable_get(:@config)
+
if !config.nil? && config.has_key?("verify_connection")
@verify_connection = config["verify_connection"]
else
@verify_connection = false
end
-
+
if !config.nil?
- @entire_sharded = config['entire_sharded']
- shards_config = config[Octopus.rails_env()]
+ @entire_sharded = config['entire_sharded']
+ shards_config = config[Octopus.rails_env()]
end
-
+
shards_config ||= []
shards_config.each do |key, value|
- if value.has_key?("adapter")
+ if value.is_a?(String) && Octopus.rails32?
+ value = resolve_string_connection(value).merge(:octopus_shard => key)
initialize_adapter(value['adapter'])
@shards[key.to_sym] = connection_pool_for(value, "#{value['adapter']}_connection")
- else
- @groups[key.to_sym] = []
+ elsif value.is_a?(Hash) && value.has_key?("adapter")
+ value.merge!(:octopus_shard => key)
+ initialize_adapter(value['adapter'])
+ @shards[key.to_sym] = connection_pool_for(value, "#{value['adapter']}_connection")
+ elsif value.is_a?(Hash)
+ @groups[key.to_s] = []
value.each do |k, v|
raise "You have duplicated shard names!" if @shards.has_key?(k.to_sym)
+
initialize_adapter(v['adapter'])
- @shards[k.to_sym] = connection_pool_for(v, "#{v['adapter']}_connection")
- @groups[key.to_sym] << k.to_sym
+ config_with_octopus_shard = v.merge(:octopus_shard => k)
+
+ @shards[k.to_sym] = connection_pool_for(config_with_octopus_shard, "#{v['adapter']}_connection")
+ @groups[key.to_s] << k.to_sym
end
end
end
end
@@ -51,51 +58,111 @@
if config.has_key?("fully_replicated")
@fully_replicated = config["fully_replicated"]
else
@fully_replicated = true
end
- @slaves_list = @shards.keys.map {|sym| sym.to_s}.sort
- @slaves_list.delete('master')
+ @slaves_list = @shards.keys.map {|sym| sym.to_s}.sort
+ @slaves_list.delete('master')
+ @slave_index = 0
end
+ def current_model
+ Thread.current["octopus.current_model"]
+ end
+
+ def current_model=(model)
+ Thread.current["octopus.current_model"] = model.is_a?(ActiveRecord::Base) ? model.class : model
+ end
+
+ def current_shard
+ Thread.current["octopus.current_shard"] ||= :master
+ end
+
def current_shard=(shard_symbol)
if shard_symbol.is_a?(Array)
shard_symbol.each {|symbol| raise "Nonexistent Shard Name: #{symbol}" if @shards[symbol].nil? }
else
raise "Nonexistent Shard Name: #{shard_symbol}" if @shards[shard_symbol].nil?
end
- @current_shard = shard_symbol
+ Thread.current["octopus.current_shard"] = shard_symbol
end
+ def current_group
+ Thread.current["octopus.current_group"]
+ end
+
def current_group=(group_symbol)
- if group_symbol.is_a?(Array)
- group_symbol.each {|symbol| raise "Nonexistent Group Name: #{symbol}" if @groups[symbol].nil? }
- else
- raise "Nonexistent Group Name: #{group_symbol}" if @groups[group_symbol].nil?
+ # TODO: Error message should include all groups if given more than one bad name.
+ [group_symbol].flatten.compact.each do |group|
+ raise "Nonexistent Group Name: #{group}" unless has_group?(group)
end
- @current_group = group_symbol
+ Thread.current["octopus.current_group"] = group_symbol
end
- def current_model=(model)
- @current_model = model.is_a?(ActiveRecord::Base) ? model.class : model
+ def block
+ Thread.current["octopus.block"]
end
- def select_connection()
- @shards[shard_name].verify_active_connections! if @verify_connection
+ def block=(block)
+ Thread.current["octopus.block"] = block
+ end
+
+ def last_current_shard
+ Thread.current["octopus.last_current_shard"]
+ end
+
+ def last_current_shard=(last_current_shard)
+ Thread.current["octopus.last_current_shard"] = last_current_shard
+ end
+
+ # Public: Whether or not a group exists with the given name converted to a
+ # string.
+ #
+ # Returns a boolean.
+ def has_group?(group)
+ @groups.has_key?(group.to_s)
+ end
+
+ # Public: Retrieves names of all loaded shards.
+ #
+ # Returns an array of shard names as symbols
+ def shard_names
+ @shards.keys
+ end
+
+ # Public: Retrieves the defined shards for a given group.
+ #
+ # Returns an array of shard names as symbols or nil if the group is not
+ # defined.
+ def shards_for_group(group)
+ @groups.fetch(group.to_s, nil)
+ end
+
+ def select_connection
+ @shards[shard_name].verify_active_connections! if @verify_connection
+ # Rails 3.1 sets automatic_reconnect to false when it removes
+ # connection pool. Octopus can potentially retain a reference to a closed
+ # connection pool. Previously, that would work since the pool would just
+ # reconnect, but in Rails 3.1 the flag prevents this.
+ if Octopus.rails31? || Octopus.rails32?
+ if !@shards[shard_name].automatic_reconnect
+ @shards[shard_name].automatic_reconnect = true
+ end
+ end
@shards[shard_name].connection()
end
def shard_name
current_shard.is_a?(Array) ? current_shard.first : current_shard
end
-
+
def should_clean_table_name?
@adapters.size > 1
end
-
+
def run_queries_on_shard(shard, &block)
older_shard = self.current_shard
last_block = self.block
begin
@@ -105,32 +172,31 @@
ensure
self.block = last_block || false
self.current_shard = older_shard
end
end
-
+
def send_queries_to_multiple_shards(shards, &block)
shards.each do |shard|
self.run_queries_on_shard(shard, &block)
end
end
-
+
def clean_proxy()
- @using_enabled = nil
- @current_shard = :master
- @current_group = nil
- @block = false
+ self.current_shard = :master
+ self.current_group = nil
+ self.block = false
end
-
+
def check_schema_migrations(shard)
- if !ActiveRecord::Base.using(shard).connection.table_exists?(ActiveRecord::Migrator.schema_migrations_table_name())
- ActiveRecord::Base.using(shard).connection.initialize_schema_migrations_table
+ if !OctopusModel.using(shard).connection.table_exists?(ActiveRecord::Migrator.schema_migrations_table_name())
+ OctopusModel.using(shard).connection.initialize_schema_migrations_table
end
end
-
+
def transaction(options = {}, &block)
- if @replicated && (current_model.read_inheritable_attribute(:replicated) || @fully_replicated)
+ if @replicated && (current_model.replicated || @fully_replicated)
self.run_queries_on_shard(:master) do
select_connection.transaction(options, &block)
end
else
select_connection.transaction(options, &block)
@@ -142,23 +208,27 @@
conn = select_connection()
self.last_current_shard = self.current_shard
clean_proxy()
conn.send(method, *args, &block)
elsif should_send_queries_to_replicated_databases?(method)
- send_queries_to_selected_slave(method, *args, &block)
+ send_queries_to_selected_slave(method, *args, &block)
else
select_connection().send(method, *args, &block)
end
end
def respond_to?(method, include_private = false)
super || select_connection.respond_to?(method, include_private)
end
+ def connection_pool
+ return @shards[current_shard]
+ end
+
protected
def connection_pool_for(adapter, config)
- ActiveRecord::ConnectionAdapters::ConnectionPool.new(ActiveRecord::Base::ConnectionSpecification.new(adapter, config))
+ ActiveRecord::ConnectionAdapters::ConnectionPool.new(ActiveRecord::Base::ConnectionSpecification.new(adapter.dup, config))
end
def initialize_adapter(adapter)
@adapters << adapter
begin
@@ -166,32 +236,33 @@
rescue LoadError
raise "Please install the #{adapter} adapter: `gem install activerecord-#{adapter}-adapter` (#{$!})"
end
end
+ def resolve_string_connection(spec)
+ ActiveRecord::Base::ConnectionSpecification::Resolver.new(spec, {}).spec.config.stringify_keys
+ end
+
def should_clean_connection?(method)
method.to_s =~ /insert|select|execute/ && !@replicated && !self.block
end
def should_send_queries_to_replicated_databases?(method)
- @replicated && method.to_s =~ /select/ && !@block
+ @replicated && method.to_s =~ /select/ && !self.block
end
- def send_queries_to_selected_slave(method, *args, &block)
+ def send_queries_to_selected_slave(method, *args, &block)
old_shard = self.current_shard
-
+
begin
- if current_model.read_inheritable_attribute(:replicated) || @fully_replicated
- self.current_shard = @slaves_list.shift.to_sym
- @slaves_list << self.current_shard
+ if current_model.replicated || @fully_replicated
+ self.current_shard = @slaves_list[@slave_index = (@slave_index + 1) % @slaves_list.length]
else
self.current_shard = :master
end
-
- sql = select_connection().send(method, *args, &block)
- return sql
+
+ select_connection.send(method, *args, &block)
ensure
self.current_shard = old_shard
- @using_enabled = nil
end
end
end