lib/octopus/proxy.rb in ar-octopus-0.0.5 vs lib/octopus/proxy.rb in ar-octopus-0.0.6
- old
+ new
@@ -1,27 +1,19 @@
-require "set"
-
class Octopus::Proxy
- attr_accessor :shards, :block, :current_model, :current_shard, :groups, :current_group, :replicated, :slaves_list, :using_enabled, :last_current_shard
+ attr_accessor :current_model, :current_shard, :current_group, :block, :using_enabled, :last_current_shard
- delegate :increment_open_transactions, :decrement_open_transactions, :to => :select_connection
-
def initialize(config)
+ initialize_shards(config)
+ initialize_replication() if config[Octopus.env()]["replicated"]
+ end
+
+ def initialize_shards(config)
@shards = {}
@groups = {}
- @replicated = config[Octopus.env()]["replicated"]
@shards[:master] = ActiveRecord::Base.connection_pool()
@current_shard = :master
- initialize_shards(config)
-
- if @replicated
- initialize_replication()
- end
- end
-
- def initialize_shards(config)
config[Octopus.env()]["shards"].each do |key, value|
if value.has_key?("adapter")
initialize_adapter(value['adapter'])
@shards[key.to_sym] = connection_pool_for(value, "#{value['adapter']}_connection")
else
@@ -36,15 +28,14 @@
end
end
end
def initialize_replication()
- @slaves_list = @shards.keys
- @slaves_list.delete(:master)
- @slaves_list = @slaves_list.map {|sym| sym.to_s}.sort
+ @replicated = true
+ @slaves_list = @shards.keys.map {|sym| sym.to_s}.sort
+ @slaves_list.delete('master')
end
-
def current_shard=(shard_symbol)
if shard_symbol.is_a?(Array)
shard_symbol.each {|symbol| raise "Nonexistent Shard Name: #{symbol}" if @shards[symbol].nil? }
else
@@ -56,34 +47,26 @@
def current_group=(group_symbol)
if group_symbol.is_a?(Array)
group_symbol.each {|symbol| raise "Nonexistent Group Name: #{symbol}" if @groups[symbol].nil? }
else
- raise "Nonexistent Group Name: #{group_symbol}" if @groups[group_symbol].nil? && !group_symbol.nil?
+ raise "Nonexistent Group Name: #{group_symbol}" if @groups[group_symbol].nil?
end
@current_group = group_symbol
end
def current_model=(model)
- if model.is_a?(ActiveRecord::Base)
- @current_model = model.class
- else
- @current_model = model
- end
+ @current_model = model.is_a?(ActiveRecord::Base) ? model.class : model
end
def select_connection()
@shards[shard_name].connection()
end
def shard_name
- if(current_shard.is_a?(Array))
- current_shard.first
- else
- current_shard
- end
+ current_shard.is_a?(Array) ? current_shard.first : current_shard
end
def add_transaction_record(record)
if !select_connection().instance_variable_get(:@_current_transaction_records).nil?
select_connection().add_transaction_record(record)
@@ -95,11 +78,11 @@
self.send_transaction_to_multiple_shards(current_shard, options, &block)
elsif should_send_queries_to_multiple_groups?
self.send_transaction_to_multiple_groups(options, &block)
elsif should_send_queries_to_a_group_of_shards?
self.send_transaction_to_multiple_shards(@groups[current_group], options, &block)
- self.current_group = nil
+ @current_group = nil
else
select_connection.transaction(options, &block)
end
end
@@ -121,14 +104,15 @@
else
select_connection().send(method, *args, &block)
end
end
- def run_query_on_shard(shard, &block)
+ def run_queries_on_shard(shard, &block)
older_shard = self.current_shard
self.block = true
self.current_shard = shard
+
begin
yield
ensure
self.block = false
self.current_shard = older_shard
@@ -140,11 +124,10 @@
ActiveRecord::ConnectionAdapters::ConnectionPool.new(ActiveRecord::Base::ConnectionSpecification.new(adapter, config))
end
def initialize_adapter(adapter)
begin
- require 'rubygems'
gem "activerecord-#{adapter}-adapter"
require "active_record/connection_adapters/#{adapter}_adapter"
rescue LoadError
begin
require "active_record/connection_adapters/#{adapter}_adapter"
@@ -153,11 +136,11 @@
end
end
end
def should_clean_connection?(method)
- method.to_s =~ /insert|select/ && !should_send_queries_to_multiple_shards? && !self.current_group && !replicated
+ method.to_s =~ /insert|select/ && !should_send_queries_to_multiple_shards? && !self.current_group && !@replicated
end
def should_send_queries_to_multiple_shards?
current_shard.is_a?(Array)
end
@@ -197,11 +180,11 @@
def send_queries_to_selected_slave(method, *args, &block)
old_shard = self.current_shard
if current_model.read_inheritable_attribute(:replicated)
if !using_enabled
- self.current_shard = slaves_list.shift.to_sym
- slaves_list << self.current_shard
+ self.current_shard = @slaves_list.shift.to_sym
+ @slaves_list << self.current_shard
end
else
self.current_shard = :master
end