lib/octopus/proxy.rb in ar-octopus-0.8.4 vs lib/octopus/proxy.rb in ar-octopus-0.8.5

- old
+ new

@@ -15,11 +15,11 @@ @shards = HashWithIndifferentAccess.new @shards_slave_groups = HashWithIndifferentAccess.new @slave_groups = HashWithIndifferentAccess.new @groups = {} @adapters = Set.new - @config = ActiveRecord::Base.connection_pool_without_octopus.connection.instance_variable_get(:@config) + @config = ActiveRecord::Base.connection_pool_without_octopus.spec.config unless config.nil? @entire_sharded = config['entire_sharded'] @shards_config = config[Octopus.rails_env] end @@ -200,10 +200,13 @@ # connection pool. Octopus can potentially retain a reference to a closed # connection pool. Previously, that would work since the pool would just # reconnect, but in Rails 3.1 the flag prevents this. def safe_connection(connection_pool) connection_pool.automatic_reconnect ||= true + if !connection_pool.connected? && @shards[:master].connection.query_cache_enabled + connection_pool.connection.enable_query_cache! + end connection_pool.connection end def select_connection safe_connection(@shards[shard_name]) @@ -216,11 +219,11 @@ def should_clean_table_name? @adapters.size > 1 end def run_queries_on_shard(shard, &_block) - keeping_connection_proxy do + keeping_connection_proxy(shard) do using_shard(shard) do yield end end end @@ -231,23 +234,23 @@ end end def clean_connection_proxy self.current_shard = :master + self.current_model = nil self.current_group = nil - self.block = false + self.block = nil end def check_schema_migrations(shard) OctopusModel.using(shard).connection.table_exists?( ActiveRecord::Migrator.schema_migrations_table_name, ) || OctopusModel.using(shard).connection.initialize_schema_migrations_table end def transaction(options = {}, &block) - replicated = @replicated && (current_model.replicated || fully_replicated?) - if !sharded && replicated + if !sharded && current_model_replicated? run_queries_on_shard(:master) do select_connection.transaction(options, &block) end else select_connection.transaction(options, &block) @@ -279,27 +282,27 @@ @shards[current_shard] end def enable_query_cache! clear_query_cache - @shards.each { |_k, v| safe_connection(v).enable_query_cache! } + with_each_healthy_shard { |v| v.connected? && safe_connection(v).enable_query_cache! } end def disable_query_cache! - @shards.each { |_k, v| safe_connection(v).disable_query_cache! } + with_each_healthy_shard { |v| v.connected? && safe_connection(v).disable_query_cache! } end def clear_query_cache - @shards.each { |_k, v| safe_connection(v).clear_query_cache } + with_each_healthy_shard { |v| v.connected? && safe_connection(v).clear_query_cache } end def clear_active_connections! - @shards.each { |_k, v| v.release_connection } + with_each_healthy_shard(&:release_connection) end def clear_all_connections! - @shards.each { |_k, v| v.disconnect! } + with_each_healthy_shard(&:disconnect!) end def connected? @shards.any? { |_k, v| v.connected? } end @@ -320,10 +323,48 @@ send_queries_to_balancer(@slave_groups[current_slave_group], method, *args, &block) end protected + # Ensure that a single failing slave doesn't take down the entire application + def with_each_healthy_shard + @shards.each do |shard_name, v| + begin + yield(v) + rescue => e + if Octopus.robust_environment? + Octopus.logger.error "Error on shard #{shard_name}: #{e.message}" + else + raise + end + end + end + + conn_handler = ActiveRecord::Base.connection_handler + if conn_handler.respond_to?(:connection_pool_list) + # Rails 4+ + ar_pools = conn_handler.connection_pool_list + else + # Rails 3.2 + ar_pools = conn_handler.connection_pools.values + end + + ar_pools.each do |pool| + next if pool == @shards[:master] # Already handled this + + begin + yield(pool) + rescue => e + if Octopus.robust_environment? + Octopus.logger.error "Error on pool (spec: #{pool.spec}): #{e.message}" + else + raise + end + end + end + end + def connection_pool_for(adapter, config) if Octopus.rails4? arg = ActiveRecord::ConnectionAdapters::ConnectionSpecification.new(adapter.dup, config) else arg = ActiveRecord::Base::ConnectionSpecification.new(adapter.dup, config) @@ -354,18 +395,22 @@ HashWithIndifferentAccess.new(resolver.spec.config) end end def should_clean_connection_proxy?(method) - method.to_s =~ /insert|select|execute/ && !@replicated && !block + method.to_s =~ /insert|select|execute/ && !current_model_replicated? && (!block || block != current_shard) end # Try to use slaves if and only if `replicated: true` is specified in `shards.yml` and no slaves groups are defined def should_send_queries_to_replicated_databases?(method) @replicated && method.to_s =~ /select/ && !block && !slaves_grouped? end + def current_model_replicated? + @replicated && (current_model.try(:replicated) || fully_replicated?) + end + def send_queries_to_selected_slave(method, *args, &block) if current_model.replicated || fully_replicated? selected_slave = @slaves_load_balancer.next else selected_slave = :master @@ -382,11 +427,11 @@ # all the model is `replicated()` # (3) It's a SELECT query # while ensuring that we revert `current_shard` from the selected slave to the (shard's) master # not to make queries other than SELECT leak to the slave. def should_use_slaves_for_method?(method) - @replicated && (current_model.replicated || fully_replicated?) && method.to_s =~ /select/ + current_model_replicated? && method.to_s =~ /select/ end def slaves_grouped? @slave_groups.present? end @@ -407,26 +452,28 @@ # Temporarily block cleaning connection proxy and run the block # # @see Octopus::Proxy#should_clean_connection? # @see Octopus::Proxy#clean_connection_proxy - def keeping_connection_proxy(&_block) + def keeping_connection_proxy(shard, &_block) last_block = block begin - self.block = true + self.block = shard yield ensure - self.block = last_block || false + self.block = last_block || nil end end # Temporarily switch `current_shard` and run the block def using_shard(shard, &_block) older_shard = current_shard begin - self.current_shard = shard + unless current_model && !current_model.allowed_shard?(shard) + self.current_shard = shard + end yield ensure self.current_shard = older_shard end end