lib/tasks/switchman.rake in switchman-1.2.22 vs lib/tasks/switchman.rake in switchman-1.2.23

- old
+ new

@@ -1,79 +1,127 @@ module Switchman - def self.shardify_task(task_name) - old_task = Rake::Task[task_name] - old_actions = old_task.actions.dup - old_task.actions.clear + module Rake + def self.filter_database_servers(&block) + chain = filter_database_servers_chain # use a local variable so that the current chain is closed over in the following lambda + @filter_database_servers_chain = lambda { |servers| block.call(servers, chain) } + end - old_task.enhance do - if ::Rails.env.test? - require 'switchman/test_helper' - TestHelper.recreate_persistent_test_shards(dont_create: true) - end + def self.shardify_task(task_name) + old_task = ::Rake::Task[task_name] + old_actions = old_task.actions.dup + old_task.actions.clear - ::Shackles.activate(:deploy) do - - scope = Shard.order("database_server_id IS NOT NULL, database_server_id, id") - if ENV['DATABASE_SERVER'] - servers = ENV['DATABASE_SERVER'] - if servers.first == '-' - negative = true - servers = servers[1..-1] - end - servers = servers.split(',') - conditions = ["database_server_id #{ "NOT " if negative }IN (?)", servers] - conditions.first << " OR database_server_id IS NULL" if servers.include?(::Rails.env) && !negative || !servers.include?(::Rails.env) && negative - scope = scope.where(conditions) + old_task.enhance do + if ::Rails.env.test? + require 'switchman/test_helper' + TestHelper.recreate_persistent_test_shards(dont_create: true) end - if ENV['SHARD'] - raw_shard_ids = ENV['SHARD'].split(',') - shards = [] - default = false - shard_ids = [] - ranges = [] - raw_shard_ids.each do |id| - if id == 'default' - default = true - elsif id =~ /(\d+)?\.\.(\.)?(\d+)?/ - raise "Invalid shard id or range: #{id}" unless $1 || $3 - range = [] - range << "id>=#{$1}" if $1 - range << "id<#{'=' unless $2}#{$3}" if $3 - ranges << "(#{range.join(' AND ')})" - elsif id =~ /\d+/ - shard_ids << id.to_i - else - raise "Invalid shard id or range: #{id}" + ::Shackles.activate(:deploy) do + servers = DatabaseServer.all + + if ENV['DATABASE_SERVER'] + servers = ENV['DATABASE_SERVER'] + if servers.first == '-' + negative = true + servers = servers[1..-1] end + servers = servers.split(',').map { |server| DatabaseServer.find(server) }.compact + servers = DatabaseServer.all - servers if negative end - queries = 0 - default_on_servers = !servers || servers.include?(Shard.default.database_server.id) - default_on_servers = !default_on_servers if negative - if default && default_on_servers - shards << Shard.default - queries += 1 + + servers = filter_database_servers_chain.call(servers) + + scope = Shard.order("database_server_id IS NOT NULL, database_server_id, id") + if servers != DatabaseServer.all + conditions = ["database_server_id IN (?)", servers.map(&:id)] + conditions.first << " OR database_server_id IS NULL" if servers.include?(Shard.default.database_server) + scope = scope.where(conditions) end - shards.concat(scope.where(:id => shard_ids).all) unless shard_ids.empty? - queries += 1 unless shard_ids.empty? - shards.concat(scope.where(ranges.join(" OR ")).all) unless ranges.empty? - queries += 1 unless ranges.empty? - shards = shards.uniq.sort_by { |shard| [shard.database_server.id, shard.id] } if queries > 1 + + if ENV['SHARD'] + scope = shard_scope(scope, ENV['SHARD']) + end + + Shard.with_each_shard(scope, Shard.categories, :parallel => ENV['PARALLEL'].to_i) do + shard = Shard.current + puts "#{shard.id}: #{shard.description}" + ::ActiveRecord::Base.connection_pool.spec.config[:shard_name] = Shard.current.name + ::ActiveRecord::Base.configurations[::Rails.env] = ::ActiveRecord::Base.connection_pool.spec.config.stringify_keys + shard.database_server.unshackle do + old_actions.each(&:call) + end + nil + end end - shards ||= scope + end + end - Shard.with_each_shard(shards, Shard.categories, :parallel => ENV['PARALLEL'].to_i) do - shard = Shard.current - puts "#{shard.id}: #{shard.description}" - ::ActiveRecord::Base.connection_pool.spec.config[:shard_name] = Shard.current.name - ::ActiveRecord::Base.configurations[::Rails.env] = ::ActiveRecord::Base.connection_pool.spec.config.stringify_keys - shard.database_server.unshackle do - old_actions.each(&:call) + %w{db:migrate db:migrate:up db:migrate:down db:rollback}.each { |task_name| shardify_task(task_name) } + + private + + def self.shard_scope(scope, raw_shard_ids) + raw_shard_ids = raw_shard_ids.split(',') + + shard_ids = [] + negative_shard_ids = [] + ranges = [] + negative_ranges = [] + raw_shard_ids.each do |id| + if id == 'default' + shard_ids << Shard.default.id + elsif id == '-default' + negative_shard_ids << Shard.default.id + elsif id =~ /(-?)(\d+)?\.\.(\.)?(\d+)?/ + negative, start, open, finish = $1.present?, $2, $3.present?, $4 + raise "Invalid shard id or range: #{id}" unless start || finish + range = [] + range << "id>=#{start}" if start + range << "id<#{'=' unless open}#{finish}" if finish + (negative ? negative_ranges : ranges) << "(#{range.join(' AND ')})" + elsif id =~ /-(\d+)/ + negative_shard_ids << $1.to_i + elsif id =~ /\d+/ + shard_ids << id.to_i + else + raise "Invalid shard id or range: #{id}" + end + end + + shard_ids.uniq! + negative_shard_ids.uniq! + unless shard_ids.empty? + shard_ids -= negative_shard_ids + if shard_ids.empty? && ranges.empty? + if ::Rails.version < '4' + return scope.where("?", false) + else + return scope.none end - nil end + # we already trimmed them all out; no need to make the server do it as well + negative_shard_ids = [] if ranges.empty? end + + conditions = [] + positive_queries = [] + unless ranges.empty? + positive_queries << ranges.join(" OR ") + end + unless shard_ids.empty? + positive_queries << "id IN (?)" + conditions << shard_ids + end + positive_query = positive_queries.join(" OR ") + scope = scope.where(positive_query, *conditions) unless positive_queries.empty? + + scope = scope.where("NOT (#{negative_ranges.join(" OR")})") unless negative_ranges.empty? + scope = scope.where("id NOT IN (?)", negative_shard_ids) unless negative_shard_ids.empty? + scope end - end - %w{db:migrate db:migrate:up db:migrate:down db:rollback}.each { |task_name| shardify_task(task_name) } + def self.filter_database_servers_chain + @filter_database_servers_chain ||= ->(servers) { servers } + end + end end