lib/octopus/proxy.rb in ar-octopus-0.8.4 vs lib/octopus/proxy.rb in ar-octopus-0.8.5
- old
+ new
@@ -15,11 +15,11 @@
@shards = HashWithIndifferentAccess.new
@shards_slave_groups = HashWithIndifferentAccess.new
@slave_groups = HashWithIndifferentAccess.new
@groups = {}
@adapters = Set.new
- @config = ActiveRecord::Base.connection_pool_without_octopus.connection.instance_variable_get(:@config)
+ @config = ActiveRecord::Base.connection_pool_without_octopus.spec.config
unless config.nil?
@entire_sharded = config['entire_sharded']
@shards_config = config[Octopus.rails_env]
end
@@ -200,10 +200,13 @@
# connection pool. Octopus can potentially retain a reference to a closed
# connection pool. Previously, that would work since the pool would just
# reconnect, but in Rails 3.1 the flag prevents this.
def safe_connection(connection_pool)
connection_pool.automatic_reconnect ||= true
+ if !connection_pool.connected? && @shards[:master].connection.query_cache_enabled
+ connection_pool.connection.enable_query_cache!
+ end
connection_pool.connection
end
def select_connection
safe_connection(@shards[shard_name])
@@ -216,11 +219,11 @@
def should_clean_table_name?
@adapters.size > 1
end
def run_queries_on_shard(shard, &_block)
- keeping_connection_proxy do
+ keeping_connection_proxy(shard) do
using_shard(shard) do
yield
end
end
end
@@ -231,23 +234,23 @@
end
end
def clean_connection_proxy
self.current_shard = :master
+ self.current_model = nil
self.current_group = nil
- self.block = false
+ self.block = nil
end
def check_schema_migrations(shard)
OctopusModel.using(shard).connection.table_exists?(
ActiveRecord::Migrator.schema_migrations_table_name,
) || OctopusModel.using(shard).connection.initialize_schema_migrations_table
end
def transaction(options = {}, &block)
- replicated = @replicated && (current_model.replicated || fully_replicated?)
- if !sharded && replicated
+ if !sharded && current_model_replicated?
run_queries_on_shard(:master) do
select_connection.transaction(options, &block)
end
else
select_connection.transaction(options, &block)
@@ -279,27 +282,27 @@
@shards[current_shard]
end
def enable_query_cache!
clear_query_cache
- @shards.each { |_k, v| safe_connection(v).enable_query_cache! }
+ with_each_healthy_shard { |v| v.connected? && safe_connection(v).enable_query_cache! }
end
def disable_query_cache!
- @shards.each { |_k, v| safe_connection(v).disable_query_cache! }
+ with_each_healthy_shard { |v| v.connected? && safe_connection(v).disable_query_cache! }
end
def clear_query_cache
- @shards.each { |_k, v| safe_connection(v).clear_query_cache }
+ with_each_healthy_shard { |v| v.connected? && safe_connection(v).clear_query_cache }
end
def clear_active_connections!
- @shards.each { |_k, v| v.release_connection }
+ with_each_healthy_shard(&:release_connection)
end
def clear_all_connections!
- @shards.each { |_k, v| v.disconnect! }
+ with_each_healthy_shard(&:disconnect!)
end
def connected?
@shards.any? { |_k, v| v.connected? }
end
@@ -320,10 +323,48 @@
send_queries_to_balancer(@slave_groups[current_slave_group], method, *args, &block)
end
protected
+ # Ensure that a single failing slave doesn't take down the entire application
+ def with_each_healthy_shard
+ @shards.each do |shard_name, v|
+ begin
+ yield(v)
+ rescue => e
+ if Octopus.robust_environment?
+ Octopus.logger.error "Error on shard #{shard_name}: #{e.message}"
+ else
+ raise
+ end
+ end
+ end
+
+ conn_handler = ActiveRecord::Base.connection_handler
+ if conn_handler.respond_to?(:connection_pool_list)
+ # Rails 4+
+ ar_pools = conn_handler.connection_pool_list
+ else
+ # Rails 3.2
+ ar_pools = conn_handler.connection_pools.values
+ end
+
+ ar_pools.each do |pool|
+ next if pool == @shards[:master] # Already handled this
+
+ begin
+ yield(pool)
+ rescue => e
+ if Octopus.robust_environment?
+ Octopus.logger.error "Error on pool (spec: #{pool.spec}): #{e.message}"
+ else
+ raise
+ end
+ end
+ end
+ end
+
def connection_pool_for(adapter, config)
if Octopus.rails4?
arg = ActiveRecord::ConnectionAdapters::ConnectionSpecification.new(adapter.dup, config)
else
arg = ActiveRecord::Base::ConnectionSpecification.new(adapter.dup, config)
@@ -354,18 +395,22 @@
HashWithIndifferentAccess.new(resolver.spec.config)
end
end
def should_clean_connection_proxy?(method)
- method.to_s =~ /insert|select|execute/ && !@replicated && !block
+ method.to_s =~ /insert|select|execute/ && !current_model_replicated? && (!block || block != current_shard)
end
# Try to use slaves if and only if `replicated: true` is specified in `shards.yml` and no slaves groups are defined
def should_send_queries_to_replicated_databases?(method)
@replicated && method.to_s =~ /select/ && !block && !slaves_grouped?
end
+ def current_model_replicated?
+ @replicated && (current_model.try(:replicated) || fully_replicated?)
+ end
+
def send_queries_to_selected_slave(method, *args, &block)
if current_model.replicated || fully_replicated?
selected_slave = @slaves_load_balancer.next
else
selected_slave = :master
@@ -382,11 +427,11 @@
# all the model is `replicated()`
# (3) It's a SELECT query
# while ensuring that we revert `current_shard` from the selected slave to the (shard's) master
# not to make queries other than SELECT leak to the slave.
def should_use_slaves_for_method?(method)
- @replicated && (current_model.replicated || fully_replicated?) && method.to_s =~ /select/
+ current_model_replicated? && method.to_s =~ /select/
end
def slaves_grouped?
@slave_groups.present?
end
@@ -407,26 +452,28 @@
# Temporarily block cleaning connection proxy and run the block
#
# @see Octopus::Proxy#should_clean_connection?
# @see Octopus::Proxy#clean_connection_proxy
- def keeping_connection_proxy(&_block)
+ def keeping_connection_proxy(shard, &_block)
last_block = block
begin
- self.block = true
+ self.block = shard
yield
ensure
- self.block = last_block || false
+ self.block = last_block || nil
end
end
# Temporarily switch `current_shard` and run the block
def using_shard(shard, &_block)
older_shard = current_shard
begin
- self.current_shard = shard
+ unless current_model && !current_model.allowed_shard?(shard)
+ self.current_shard = shard
+ end
yield
ensure
self.current_shard = older_shard
end
end