lib/mongo/collection/view/map_reduce.rb in mongo-2.1.0.beta vs lib/mongo/collection/view/map_reduce.rb in mongo-2.1.0.rc0

- old
+ new

@@ -23,10 +23,11 @@ class MapReduce extend Forwardable include Enumerable include Immutable include Iterable + include Loggable # @return [ View ] view The collection view. attr_reader :view # @return [ String ] map The map function. @@ -156,46 +157,50 @@ :read => read, :selector => { :mapreduce => collection.name, :map => map, :reduce => reduce, - :query => view.selector[:$query] || view.selector, + :query => view.modifiers[:$query] || view.selector, :out => { inline: 1 } - }.merge(options).merge(view.options) + }.merge(options).merge(view_options) } end + def view_options + view.sort ? view.options.merge(:sort => view.sort) : view.options + end + def new(options) MapReduce.new(view, map, reduce, options) end def initial_query_op Operation::MapReduce.new(map_reduce_spec) end + def valid_server?(server) + server.standalone? || server.mongos? || server.primary? || secondary_ok? + end + + def secondary_ok? + out.respond_to?(:keys) && + out.keys.first.to_s.downcase == 'inline' + end + def send_initial_query(server) - result = - begin - initial_query_op.execute(server.context) - rescue Mongo::Error::NeedPrimaryServer - log_warn([ - 'Rerouting the MapReduce operation to the primary server.' - ]) - server = ServerSelector.get(mode: :primary).select_server(cluster) - initial_query_op.execute(server.context) - end - if inline? - result - else - send_fetch_query(server) + unless valid_server?(server) + log_warn('Rerouting the MapReduce operation to the primary server.') + server = cluster.next_primary end + result = initial_query_op.execute(server.context) + inline? ? result : send_fetch_query(server) end def fetch_query_spec { :selector => {}, :options => {}, :db_name => database.name, - :coll_name => out.values.first } + :coll_name => out.respond_to?(:keys) ? out.values.first : out } end def fetch_query_op Operation::Read::Query.new(fetch_query_spec) end