lib/octopus/proxy.rb in ar-octopus-0.0.20 vs lib/octopus/proxy.rb in ar-octopus-0.0.21

- old
+ new

@@ -68,64 +68,56 @@ end def shard_name current_shard.is_a?(Array) ? current_shard.first : current_shard end + + def run_queries_on_shard(shard, &block) + older_shard = self.current_shard + self.block = true + self.current_shard = shard - def add_transaction_record(record) - if !select_connection().instance_variable_get(:@_current_transaction_records).nil? - select_connection().add_transaction_record(record) + begin + yield + ensure + self.block = false + self.current_shard = older_shard end end - - def transaction(options = {}, &block) - if should_send_queries_to_multiple_shards? - self.send_transaction_to_multiple_shards(current_shard, options, &block) - elsif should_send_queries_to_multiple_groups? - self.send_transaction_to_multiple_groups(options, &block) - @current_group = nil - elsif should_send_queries_to_a_group_of_shards? - self.send_transaction_to_multiple_shards(@groups[current_group], options, &block) - @current_group = nil - else - select_connection.transaction(options, &block) + + def send_queries_to_multiple_shards(shards, &block) + shards.each do |shard| + self.run_queries_on_shard(shard, &block) end end + + def clean_proxy() + @using_enabled = nil + @current_shard = :master + @current_group = nil + @block = false + end + + def check_schema_migrations(shard) + if !ActiveRecord::Base.using(shard).connection.table_exists?(ActiveRecord::Migrator.schema_migrations_table_name()) + ActiveRecord::Base.using(shard).connection.initialize_schema_migrations_table + end + end def method_missing(method, *args, &block) if should_clean_connection?(method) conn = select_connection() self.last_current_shard = self.current_shard - self.current_shard = :master - @using_enabled = nil + clean_proxy() conn.send(method, *args, &block) elsif should_send_queries_to_replicated_databases?(method) send_queries_to_selected_slave(method, *args, &block) - elsif should_send_queries_to_multiple_groups? - send_queries_to_multiple_groups(method, *args, &block) - elsif should_send_queries_to_multiple_shards? - send_queries_to_shards(current_shard, method, *args, &block) - elsif should_send_queries_to_a_group_of_shards? - send_queries_to_shards(@groups[current_group], method, *args, &block) else select_connection().send(method, *args, &block) end end - def run_queries_on_shard(shard, &block) - older_shard = self.current_shard - self.block = true - self.current_shard = shard - - begin - yield - ensure - self.block = false - self.current_shard = older_shard - end - end - protected def connection_pool_for(adapter, config) ActiveRecord::ConnectionAdapters::ConnectionPool.new(ActiveRecord::Base::ConnectionSpecification.new(adapter, config)) end @@ -141,53 +133,21 @@ end end end def should_clean_connection?(method) - method.to_s =~ /insert|select|execute/ && !should_send_queries_to_multiple_shards? && !self.current_group && !@replicated && !self.block + method.to_s =~ /insert|select|execute/ && !self.current_group && !@replicated && !self.block end - def should_send_queries_to_multiple_shards? - current_shard.is_a?(Array) - end - - def should_send_queries_to_multiple_groups? - current_group.is_a?(Array) - end - - def should_send_queries_to_a_group_of_shards? - !current_group.nil? - end - def should_send_queries_to_replicated_databases?(method) @replicated && method.to_s =~ /select/ end def have_config_for_enviroment?(config) !config[Octopus.env()].nil? end - def send_queries_to_multiple_groups(method, *args, &block) - method_return = nil - - current_group.each do |group_symbol| - method_return = self.send_queries_to_shards(@groups[group_symbol], method, *args, &block) - end - - return method_return - end - - def send_queries_to_shards(shard_array, method, *args, &block) - method_return = nil - - shard_array.each do |shard_symbol| - method_return = @shards[shard_symbol].connection().send(method, *args, &block) - end - - return method_return - end - def send_queries_to_selected_slave(method, *args, &block) old_shard = self.current_shard if current_model.read_inheritable_attribute(:replicated) || @entire_replicated if !using_enabled @@ -200,19 +160,7 @@ sql = select_connection().send(method, *args, &block) self.current_shard = old_shard @using_enabled = nil return sql - end - - def send_transaction_to_multiple_shards(shard_array, options, &block) - shard_array.each do |shard_symbol| - @shards[shard_symbol].connection().transaction(options, &block) - end - end - - def send_transaction_to_multiple_groups(options, &block) - current_group.each do |group_symbol| - self.send_transaction_to_multiple_shards(@groups[group_symbol], options, &block) - end end end