Sha256: 2f3b544105de86876fdb36ad95850681c8a38983e8e42ab80954ddc001ec319a
Contents?: true
Size: 1.37 KB
Versions: 1
Compression:
Stored size: 1.37 KB
Contents
module Querrel module MapReduce def query(scope, options = {}, &blk) buckets = map(scope, options, &blk) reduce(buckets) end def map(scope, options = {}, &blk) options = @options.merge(options) if options.key?(:on) resolver = ConnectionResolver.new(options[:on], !!options[:db_names]) dbs = resolver.configurations.keys else resolver = @connection_resolver dbs = @connection_resolver.configurations.keys end results = {} results_semaphore = Mutex.new pool = StaticPool.new(options[:threads] || 20) dbs.each do |db| pool.enqueue do ActiveRecord::Base.connection_handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new con_config = retrieve_connection_config(db, resolver) ActiveRecord::Base.establish_connection(con_config) begin local_results = block_given? ? yield(scope, db) : scope local_results.to_a.each(&:readonly!) if local_results.is_a?(ActiveRecord::Relation) results_semaphore.synchronize { results[db] = local_results } ensure ActiveRecord::Base.connection_handler.clear_all_connections! end end end pool.do_your_thang! results end def reduce(buckets) buckets.flat_map{ |db, results| results } end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
querrel-1.4.0 | lib/querrel/map_reduce.rb |