lib/octopus/proxy.rb in ar-octopus-0.0.20 vs lib/octopus/proxy.rb in ar-octopus-0.0.21
- old
+ new
@@ -68,64 +68,56 @@
end
def shard_name
current_shard.is_a?(Array) ? current_shard.first : current_shard
end
+
+ def run_queries_on_shard(shard, &block)
+ older_shard = self.current_shard
+ self.block = true
+ self.current_shard = shard
- def add_transaction_record(record)
- if !select_connection().instance_variable_get(:@_current_transaction_records).nil?
- select_connection().add_transaction_record(record)
+ begin
+ yield
+ ensure
+ self.block = false
+ self.current_shard = older_shard
end
end
-
- def transaction(options = {}, &block)
- if should_send_queries_to_multiple_shards?
- self.send_transaction_to_multiple_shards(current_shard, options, &block)
- elsif should_send_queries_to_multiple_groups?
- self.send_transaction_to_multiple_groups(options, &block)
- @current_group = nil
- elsif should_send_queries_to_a_group_of_shards?
- self.send_transaction_to_multiple_shards(@groups[current_group], options, &block)
- @current_group = nil
- else
- select_connection.transaction(options, &block)
+
+ def send_queries_to_multiple_shards(shards, &block)
+ shards.each do |shard|
+ self.run_queries_on_shard(shard, &block)
end
end
+
+ def clean_proxy()
+ @using_enabled = nil
+ @current_shard = :master
+ @current_group = nil
+ @block = false
+ end
+
+ def check_schema_migrations(shard)
+ if !ActiveRecord::Base.using(shard).connection.table_exists?(ActiveRecord::Migrator.schema_migrations_table_name())
+ ActiveRecord::Base.using(shard).connection.initialize_schema_migrations_table
+ end
+ end
def method_missing(method, *args, &block)
if should_clean_connection?(method)
conn = select_connection()
self.last_current_shard = self.current_shard
- self.current_shard = :master
- @using_enabled = nil
+ clean_proxy()
conn.send(method, *args, &block)
elsif should_send_queries_to_replicated_databases?(method)
send_queries_to_selected_slave(method, *args, &block)
- elsif should_send_queries_to_multiple_groups?
- send_queries_to_multiple_groups(method, *args, &block)
- elsif should_send_queries_to_multiple_shards?
- send_queries_to_shards(current_shard, method, *args, &block)
- elsif should_send_queries_to_a_group_of_shards?
- send_queries_to_shards(@groups[current_group], method, *args, &block)
else
select_connection().send(method, *args, &block)
end
end
- def run_queries_on_shard(shard, &block)
- older_shard = self.current_shard
- self.block = true
- self.current_shard = shard
-
- begin
- yield
- ensure
- self.block = false
- self.current_shard = older_shard
- end
- end
-
protected
def connection_pool_for(adapter, config)
ActiveRecord::ConnectionAdapters::ConnectionPool.new(ActiveRecord::Base::ConnectionSpecification.new(adapter, config))
end
@@ -141,53 +133,21 @@
end
end
end
def should_clean_connection?(method)
- method.to_s =~ /insert|select|execute/ && !should_send_queries_to_multiple_shards? && !self.current_group && !@replicated && !self.block
+ method.to_s =~ /insert|select|execute/ && !self.current_group && !@replicated && !self.block
end
- def should_send_queries_to_multiple_shards?
- current_shard.is_a?(Array)
- end
-
- def should_send_queries_to_multiple_groups?
- current_group.is_a?(Array)
- end
-
- def should_send_queries_to_a_group_of_shards?
- !current_group.nil?
- end
-
def should_send_queries_to_replicated_databases?(method)
@replicated && method.to_s =~ /select/
end
def have_config_for_enviroment?(config)
!config[Octopus.env()].nil?
end
- def send_queries_to_multiple_groups(method, *args, &block)
- method_return = nil
-
- current_group.each do |group_symbol|
- method_return = self.send_queries_to_shards(@groups[group_symbol], method, *args, &block)
- end
-
- return method_return
- end
-
- def send_queries_to_shards(shard_array, method, *args, &block)
- method_return = nil
-
- shard_array.each do |shard_symbol|
- method_return = @shards[shard_symbol].connection().send(method, *args, &block)
- end
-
- return method_return
- end
-
def send_queries_to_selected_slave(method, *args, &block)
old_shard = self.current_shard
if current_model.read_inheritable_attribute(:replicated) || @entire_replicated
if !using_enabled
@@ -200,19 +160,7 @@
sql = select_connection().send(method, *args, &block)
self.current_shard = old_shard
@using_enabled = nil
return sql
- end
-
- def send_transaction_to_multiple_shards(shard_array, options, &block)
- shard_array.each do |shard_symbol|
- @shards[shard_symbol].connection().transaction(options, &block)
- end
- end
-
- def send_transaction_to_multiple_groups(options, &block)
- current_group.each do |group_symbol|
- self.send_transaction_to_multiple_shards(@groups[group_symbol], options, &block)
- end
end
end